diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 597d2252..0a4b9dfa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,11 +58,11 @@ jobs: - name: Build project env: - ALPAKKA_S3_AWS_CREDENTIALS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY }} - ALPAKKA_S3_REGION_DEFAULT_REGION: us-west-2 - ALPAKKA_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY }} - ALPAKKA_S3_AWS_CREDENTIALS_PROVIDER: static - ALPAKKA_S3_REGION_PROVIDER: static + PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_PROVIDER: static + PEKKO_CONNECTORS_S3_REGION_DEFAULT_REGION: us-west-2 + PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY }} + PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_KEY }} + PEKKO_CONNECTORS_S3_REGION_PROVIDER: static run: sbt ++${{ matrix.scala }} clean coverage test - name: Compile docs diff --git a/.scala-steward.conf b/.scala-steward.conf index e4e79e8d..18089335 100644 --- a/.scala-steward.conf +++ b/.scala-steward.conf @@ -1,9 +1 @@ updatePullRequests = "always" - -# https://www.lightbend.com/blog/why-we-are-changing-the-license-for-akka -updates.pin = [{ groupId = "com.typesafe.akka", version = "2.6." }] -updates.ignore = [ - { groupId = "com.typesafe.akka" }, - { groupId = "com.lightbend.akka" }, - { groupId = "com.lightbend.akka.grpc" } -] diff --git a/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala b/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala index 10533bfb..33f5e1ed 100644 --- a/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala +++ b/backup-gcs/src/main/scala/io/aiven/guardian/kafka/backup/gcs/BackupClient.scala @@ -1,23 +1,25 @@ package io.aiven.guardian.kafka.backup.gcs -import akka.actor.ActorSystem -import akka.http.scaladsl.model.ContentTypes -import akka.stream.alpakka.google.GoogleAttributes -import akka.stream.alpakka.google.GoogleSettings -import akka.stream.alpakka.googlecloud.storage.StorageObject -import akka.stream.alpakka.googlecloud.storage.scaladsl.GCStorage -import akka.stream.scaladsl.Sink -import akka.util.ByteString import io.aiven.guardian.kafka.backup.BackupClientInterface import io.aiven.guardian.kafka.backup.KafkaConsumerInterface import io.aiven.guardian.kafka.backup.configs.Backup import io.aiven.guardian.kafka.gcs.configs.{GCS => GCSConfig} +import org.apache.pekko import scala.concurrent.ExecutionContext import scala.concurrent.Future +import pekko.actor.ActorSystem +import pekko.http.scaladsl.model.ContentTypes +import pekko.stream.connectors.google.GoogleAttributes +import pekko.stream.connectors.google.GoogleSettings +import pekko.stream.connectors.googlecloud.storage.StorageObject +import pekko.stream.connectors.googlecloud.storage.scaladsl.GCStorage +import pekko.stream.scaladsl.Sink +import pekko.util.ByteString + // TODO: GCS implementation currently does not work correctly because of inability of current GCS implementation in -// Alpakka to allow us to commit Kafka cursor whenever chunks are uploaded +// Pekko Connectors to allow us to commit Kafka cursor whenever chunks are uploaded class BackupClient[T <: KafkaConsumerInterface](maybeGoogleSettings: Option[GoogleSettings])(implicit override val kafkaClientInterface: T, override val backupConfig: Backup, diff --git a/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala b/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala index 246dc090..f7c04ed1 100644 --- a/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala +++ b/backup-s3/src/main/scala/io/aiven/guardian/kafka/backup/s3/BackupClient.scala @@ -1,19 +1,12 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.Done -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.SinkShape -import akka.stream.alpakka.s3._ -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl._ -import akka.util.ByteString import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.backup.BackupClientInterface import io.aiven.guardian.kafka.backup.KafkaConsumerInterface import io.aiven.guardian.kafka.backup.configs.Backup import io.aiven.guardian.kafka.models.BackupObjectMetadata import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} +import org.apache.pekko import scala.collection.immutable import scala.concurrent.ExecutionContext @@ -21,6 +14,15 @@ import scala.concurrent.Future import java.time.Instant +import pekko.Done +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.SinkShape +import pekko.stream.connectors.s3._ +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl._ +import pekko.util.ByteString + class BackupClient[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settings])(implicit override val kafkaClientInterface: T, override val backupConfig: Backup, @@ -126,7 +128,7 @@ class BackupClient[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settin maybeParts <- getPartsFromUpload(key, uploadId) } yield maybeParts.map { parts => val finalParts = parts.lastOption match { - case Some(part) if part.size >= akka.stream.alpakka.s3.scaladsl.S3.MinChunkSize => + case Some(part) if part.size >= pekko.stream.connectors.s3.scaladsl.S3.MinChunkSize => parts case _ => // We drop the last part here since its broken @@ -199,7 +201,7 @@ class BackupClient[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settin private[s3] def kafkaBatchSink : Sink[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext]), NotUsed] = - // See https://doc.akka.io/docs/akka/current/stream/operators/Partition.html for an explanation on Partition + // See https://pekko.apache.org/docs/pekko/current/stream/operators/Partition.html for an explanation on Partition Sink.fromGraph(GraphDSL.create() { implicit builder => import GraphDSL.Implicits._ val partition = builder.add( diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientChunkState.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientChunkState.scala index c132a2fe..ef26d632 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientChunkState.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientChunkState.scala @@ -1,21 +1,23 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.Done -import akka.actor.ActorSystem -import akka.stream.alpakka.s3.S3Headers -import akka.stream.alpakka.s3.S3Settings -import akka.stream.alpakka.s3.SuccessfulUploadPart -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.Sink import io.aiven.guardian.kafka.backup.KafkaConsumerInterface import io.aiven.guardian.kafka.backup.configs.Backup import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} +import org.apache.pekko import scala.collection.immutable import scala.concurrent.Future import java.util.concurrent.ConcurrentLinkedQueue +import pekko.Done +import pekko.actor.ActorSystem +import pekko.stream.connectors.s3.S3Headers +import pekko.stream.connectors.s3.S3Settings +import pekko.stream.connectors.s3.SuccessfulUploadPart +import pekko.stream.scaladsl.Flow +import pekko.stream.scaladsl.Sink + class BackupClientChunkState[T <: KafkaConsumerInterface](maybeS3Settings: Option[S3Settings])(implicit override val kafkaClientInterface: T, override val backupConfig: Backup, diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala index 2cefbb8b..3256cc25 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/BackupClientSpec.scala @@ -1,9 +1,5 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source import com.softwaremill.diffx.generic.auto._ import com.softwaremill.diffx.scalatest.DiffMustMatcher._ import com.typesafe.scalalogging.LazyLogging @@ -14,7 +10,8 @@ import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.s3.Generators._ import io.aiven.guardian.kafka.s3.S3Spec import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} -import org.mdedetrich.akka.stream.support.CirceStreamSupport +import org.apache.pekko +import org.mdedetrich.pekko.stream.support.CirceStreamSupport import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.must.Matchers @@ -25,6 +22,11 @@ import scala.language.postfixOps import java.time.OffsetDateTime +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl.Keep +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.Source + trait BackupClientSpec extends S3Spec with Matchers with BeforeAndAfterAll with LazyLogging { val ThrottleElements: Int = 100 @@ -50,7 +52,7 @@ trait BackupClientSpec extends S3Spec with Matchers with BeforeAndAfterAll with val calculatedFuture = for { _ <- createBucket(s3Config.dataBucket) _ <- backupClient.backup.run() - _ <- akka.pattern.after(delay)(Future.successful(())) + _ <- pekko.pattern.after(delay)(Future.successful(())) bucketContents <- S3.listBucket(s3Config.dataBucket, None, s3Headers) .withAttributes(s3Attrs) diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/KafkaConsumerWithKillSwitch.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/KafkaConsumerWithKillSwitch.scala index 971cf358..4221036b 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/KafkaConsumerWithKillSwitch.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/KafkaConsumerWithKillSwitch.scala @@ -1,16 +1,18 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.actor.ActorSystem -import akka.kafka.CommitterSettings -import akka.kafka.ConsumerMessage -import akka.kafka.ConsumerSettings -import akka.kafka.scaladsl.Consumer -import akka.stream.SharedKillSwitch -import akka.stream.scaladsl.SourceWithContext import io.aiven.guardian.kafka.backup.KafkaConsumer import io.aiven.guardian.kafka.backup.configs.Backup import io.aiven.guardian.kafka.configs.KafkaCluster import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.pekko + +import pekko.actor.ActorSystem +import pekko.kafka.CommitterSettings +import pekko.kafka.ConsumerMessage +import pekko.kafka.ConsumerSettings +import pekko.kafka.scaladsl.Consumer +import pekko.stream.SharedKillSwitch +import pekko.stream.scaladsl.SourceWithContext class KafkaConsumerWithKillSwitch( configureConsumer: Option[ diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MinioBackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MinioBackupClientSpec.scala index d733e91d..f4df7d39 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MinioBackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MinioBackupClientSpec.scala @@ -1,8 +1,8 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.actor.ActorSystem -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.s3.MinioS3Test +import io.aiven.guardian.pekko.AnyPropTestKit +import org.apache.pekko.actor.ActorSystem class MinioBackupClientSpec extends AnyPropTestKit(ActorSystem("MinioS3BackupClientSpec")) diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupConsumerSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupConsumerSpec.scala index 6cdb8153..633f7230 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupConsumerSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedKafkaClientBackupConsumerSpec.scala @@ -1,12 +1,6 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.actor.ActorSystem -import akka.stream.alpakka.s3.S3Settings -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source import com.softwaremill.diffx.scalatest.DiffMustMatcher._ -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.Generators._ import io.aiven.guardian.kafka.Utils import io.aiven.guardian.kafka.backup.MockedBackupClientInterface @@ -18,7 +12,9 @@ import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.s3.Generators._ import io.aiven.guardian.kafka.s3.S3Spec import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} -import org.mdedetrich.akka.stream.support.CirceStreamSupport +import io.aiven.guardian.pekko.AnyPropTestKit +import org.apache.pekko +import org.mdedetrich.pekko.stream.support.CirceStreamSupport import org.scalatest.matchers.must.Matchers import scala.concurrent.ExecutionContext @@ -27,6 +23,12 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._ import scala.language.postfixOps +import pekko.actor.ActorSystem +import pekko.stream.connectors.s3.S3Settings +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.Source + class MockedKafkaClientBackupConsumerSpec extends AnyPropTestKit(ActorSystem("MockedKafkaClientBackupClientSpec")) with S3Spec @@ -70,7 +72,7 @@ class MockedKafkaClientBackupConsumerSpec val calculatedFuture = for { _ <- createBucket(s3Config.dataBucket) _ = backupClient.backup.run() - bucketContents <- akka.pattern.after(10 seconds)( + bucketContents <- pekko.pattern.after(10 seconds)( S3.listBucket(s3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq) ) keysSorted = bucketContents.map(_.key).sortBy(Utils.keyToOffsetDateTime) diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala index 42736771..2ab42eca 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/MockedS3BackupClientInterface.scala @@ -1,20 +1,22 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.alpakka.s3.S3Headers -import akka.stream.alpakka.s3.S3Settings -import akka.stream.scaladsl.Source import io.aiven.guardian.kafka.backup.MockedBackupClientInterface import io.aiven.guardian.kafka.backup.MockedKafkaConsumerInterface import io.aiven.guardian.kafka.backup.configs.Backup import io.aiven.guardian.kafka.backup.configs.TimeConfiguration import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} +import org.apache.pekko import scala.concurrent.duration._ import scala.language.postfixOps +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.connectors.s3.S3Headers +import pekko.stream.connectors.s3.S3Settings +import pekko.stream.scaladsl.Source + class MockedS3BackupClientInterface( kafkaData: Source[ReducedConsumerRecord, NotUsed], timeConfiguration: TimeConfiguration, diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala index 9c2526b0..17b973a1 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientSpec.scala @@ -1,8 +1,8 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.actor.ActorSystem -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.pekko.AnyPropTestKit +import org.apache.pekko.actor.ActorSystem class RealS3BackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3BackupClientSpec")) with RealS3BackupClientTest { override val compression: Option[Compression] = None diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientTest.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientTest.scala index d7f1fd65..61104737 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientTest.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3BackupClientTest.scala @@ -1,12 +1,5 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.kafka.scaladsl.Producer -import akka.stream.KillSwitches -import akka.stream.SharedKillSwitch -import akka.stream.alpakka.s3.S3Settings -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Compression -import akka.stream.scaladsl.Sink import com.softwaremill.diffx.scalatest.DiffMustMatcher._ import io.aiven.guardian.kafka.Generators._ import io.aiven.guardian.kafka.KafkaClusterTest @@ -24,7 +17,8 @@ import io.aiven.guardian.kafka.models.Gzip import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} -import org.mdedetrich.akka.stream.support.CirceStreamSupport +import org.apache.pekko +import org.mdedetrich.pekko.stream.support.CirceStreamSupport import org.scalatest.propspec.AnyPropSpecLike import scala.concurrent.Future @@ -33,6 +27,14 @@ import scala.language.postfixOps import java.time.temporal.ChronoUnit +import pekko.kafka.scaladsl.Producer +import pekko.stream.KillSwitches +import pekko.stream.SharedKillSwitch +import pekko.stream.connectors.s3.S3Settings +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl.Compression +import pekko.stream.scaladsl.Sink + trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with BackupClientSpec { def compression: Option[CompressionConfig] @@ -78,9 +80,9 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with delay: FiniteDuration = 5 seconds ): Future[Unit] = if (backupClient.processedChunks.size() > 0) - akka.pattern.after(delay)(Future.successful(())) + pekko.pattern.after(delay)(Future.successful(())) else - akka.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay)) + pekko.pattern.after(step)(waitUntilBackupClientHasCommitted(backupClient, step, delay)) private def downloadObject(dataBucket: String, key: String) = { val downloadSource = S3 @@ -134,7 +136,7 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with _ <- createTopics(topics) _ <- createBucket(s3Config.dataBucket) _ = backupClientWrapped.run() - _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + _ <- pekko.pattern.after(KafkaInitializationTimeoutConstant)( baseSource .runWith(Producer.plainSink(producerSettings)) ) @@ -224,7 +226,7 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with _ = baseSource.runWith(Producer.plainSink(producerSettings)) _ <- waitUntilBackupClientHasCommitted(backupClient) _ = killSwitch.abort(TerminationException) - _ <- akka.pattern.after(2 seconds) { + _ <- pekko.pattern.after(2 seconds) { Future { secondBackupClientWrapped.run() } @@ -345,7 +347,7 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with _ = baseSource.runWith(Producer.plainSink(producerSettings)) _ <- waitUntilBackupClientHasCommitted(backupClient) _ = killSwitch.abort(TerminationException) - _ <- akka.pattern.after(2 seconds) { + _ <- pekko.pattern.after(2 seconds) { Future { secondBackupClientWrapped.run() } @@ -423,13 +425,13 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with _ <- createTopics(topics) _ <- createBucket(s3Config.dataBucket) _ = backupClientWrapped.run() - _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + _ <- pekko.pattern.after(KafkaInitializationTimeoutConstant)( baseSource .runWith(Producer.plainSink(producerSettings)) ) _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) - bucketContents <- akka.pattern.after(10 seconds)( + bucketContents <- pekko.pattern.after(10 seconds)( S3.listBucket(s3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq) ) keysSorted = bucketContents.map(_.key).sortBy(Utils.keyToOffsetDateTime) @@ -537,7 +539,7 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with _ <- createBucket(secondS3Config.dataBucket) _ = backupClientOneWrapped.run() _ = backupClientTwoWrapped.run() - _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + _ <- pekko.pattern.after(KafkaInitializationTimeoutConstant)( baseSource .runWith(Producer.plainSink(producerSettings)) ) @@ -662,14 +664,14 @@ trait RealS3BackupClientTest extends AnyPropSpecLike with KafkaClusterTest with _ <- createBucket(secondS3Config.dataBucket) _ = backupClientOneWrapped.run() _ = backupClientTwoWrapped.run() - _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + _ <- pekko.pattern.after(KafkaInitializationTimeoutConstant)( baseSource .runWith(Producer.plainSink(producerSettings)) ) _ <- sendTopicAfterTimePeriod(1 minute, producerSettings, topics.head) (bucketContentsOne, bucketContentsTwo) <- - akka.pattern.after(10 seconds)(for { + pekko.pattern.after(10 seconds)(for { bucketContentsOne <- S3.listBucket(firstS3Config.dataBucket, None).withAttributes(s3Attrs).runWith(Sink.seq) bucketContentsTwo <- diff --git a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3GzipCompressionBackupClientSpec.scala b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3GzipCompressionBackupClientSpec.scala index b60c693e..6172f3d6 100644 --- a/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3GzipCompressionBackupClientSpec.scala +++ b/backup-s3/src/test/scala/io/aiven/guardian/kafka/backup/s3/RealS3GzipCompressionBackupClientSpec.scala @@ -1,9 +1,9 @@ package io.aiven.guardian.kafka.backup.s3 -import akka.actor.ActorSystem -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.backup.configs.Compression import io.aiven.guardian.kafka.models.Gzip +import io.aiven.guardian.pekko.AnyPropTestKit +import org.apache.pekko.actor.ActorSystem class RealS3GzipCompressionBackupClientSpec extends AnyPropTestKit(ActorSystem("RealS3GzipCompressionBackupClientSpec")) diff --git a/build.sbt b/build.sbt index df520e93..afc65fb1 100644 --- a/build.sbt +++ b/build.sbt @@ -8,28 +8,32 @@ ThisBuild / organizationHomepage := Some(url("https://aiven.io/")) ThisBuild / resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots" -val akkaVersion = "2.6.20" -val akkaHttpVersion = "10.2.10" -val alpakkaKafkaVersion = "3.0.1" -val kafkaClientsVersion = "3.4.0" -val alpakkaVersion = "4.0.0" -val futilesVersion = "2.0.2" -val quillJdbcMonixVersion = "3.7.2" -val postgresqlJdbcVersion = "42.6.0" -val scalaLoggingVersion = "3.9.5" -val logbackClassicVersion = "1.4.7" -val declineVersion = "2.4.1" -val pureConfigVersion = "0.17.4" -val scalaTestVersion = "3.2.16" -val scalaTestScalaCheckVersion = s"$scalaTestVersion.0" -val akkaStreamsJson = "0.9.0" -val diffxVersion = "0.8.3" -val testContainersVersion = "0.40.16" -val testContainersJavaVersion = "1.18.3" -val scalaCheckVersion = "1.17.0" -val scalaCheckOpsVersion = "2.10.0" -val enumeratumVersion = "1.7.2" -val organizeImportsVersion = "0.6.0" +// TODO: Remove when Pekko has a proper release +ThisBuild / resolvers += Resolver.ApacheMavenSnapshotsRepo +ThisBuild / updateOptions := updateOptions.value.withLatestSnapshots(false) + +val pekkoVersion = "0.0.0+26669-ec5b6764-SNAPSHOT" +val pekkoHttpVersion = "0.0.0+4411-6fe04045-SNAPSHOT" +val pekkoConnectorsKafkaVersion = "0.0.0+1738-07a19b8e-SNAPSHOT" +val kafkaClientsVersion = "3.4.0" +val pekkoConnectorsVersion = "0.0.0+85-a82f3c3c-SNAPSHOT" +val futilesVersion = "2.0.2" +val quillJdbcMonixVersion = "3.7.2" +val postgresqlJdbcVersion = "42.6.0" +val scalaLoggingVersion = "3.9.5" +val logbackClassicVersion = "1.4.7" +val declineVersion = "2.4.1" +val pureConfigVersion = "0.17.4" +val scalaTestVersion = "3.2.16" +val scalaTestScalaCheckVersion = s"$scalaTestVersion.0" +val pekkoStreamCirceVersion = "0.0.0+94-dbf3173f-SNAPSHOT" +val diffxVersion = "0.8.3" +val testContainersVersion = "0.40.16" +val testContainersJavaVersion = "1.18.3" +val scalaCheckVersion = "1.17.0" +val scalaCheckOpsVersion = "2.10.0" +val enumeratumVersion = "1.7.2" +val organizeImportsVersion = "0.6.0" /** Calculates the scalatest version in a format that is used for `org.scalatestplus` scalacheck artifacts * @@ -127,27 +131,27 @@ lazy val core = project librarySettings, name := s"$baseName-core", libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-actor" % akkaVersion, - "com.typesafe.akka" %% "akka-stream" % akkaVersion, - "com.typesafe.akka" %% "akka-stream-kafka" % alpakkaKafkaVersion, + "org.apache.pekko" %% "pekko-actor" % pekkoVersion, + "org.apache.pekko" %% "pekko-stream" % pekkoVersion, + "org.apache.pekko" %% "pekko-connectors-kafka" % pekkoConnectorsKafkaVersion, // Ideally we shouldn't be explicitly providing a kafka-clients version and instead getting the version - // transitively from akka-streams-kafka however there isn't a nice way to extract a transitive dependency + // transitively from pekko-connectors-kafka however there isn't a nice way to extract a transitive dependency // for usage in linking to documentation. "org.apache.kafka" % "kafka-clients" % kafkaClientsVersion, "com.typesafe.scala-logging" %% "scala-logging" % scalaLoggingVersion, "com.github.pureconfig" %% "pureconfig" % pureConfigVersion, "ch.qos.logback" % "logback-classic" % logbackClassicVersion, - "org.mdedetrich" %% "akka-stream-circe" % akkaStreamsJson, + "org.mdedetrich" %% "pekko-stream-circe" % pekkoStreamCirceVersion, "com.markatta" %% "futiles" % futilesVersion, - "com.typesafe.akka" %% "akka-actor" % akkaVersion % Test, - "com.typesafe.akka" %% "akka-stream" % akkaVersion % Test, + "org.apache.pekko" %% "pekko-actor" % pekkoVersion % Test, + "org.apache.pekko" %% "pekko-stream" % pekkoVersion % Test, "org.scalatest" %% "scalatest" % scalaTestVersion % Test, "org.scalatestplus" %% scalaTestScalaCheckArtifact % scalaTestScalaCheckVersion % Test, "org.scalacheck" %% "scalacheck" % scalaCheckVersion % Test, "com.rallyhealth" %% "scalacheck-ops_1-16" % scalaCheckOpsVersion % Test, "com.softwaremill.diffx" %% "diffx-scalatest-must" % diffxVersion % Test, - "com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test, - "com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test, + "org.apache.pekko" %% "pekko-stream-testkit" % pekkoVersion % Test, + "org.apache.pekko" %% "pekko-http-testkit" % pekkoHttpVersion % Test, "com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersVersion % Test, "com.dimafeng" %% "testcontainers-scala-kafka" % testContainersVersion % Test, "org.testcontainers" % "kafka" % testContainersJavaVersion % Test @@ -166,11 +170,11 @@ lazy val coreCli = project ) ++ flagsFor13, name := s"$baseName-core-cli", libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-actor" % akkaVersion, - "com.typesafe.akka" %% "akka-stream" % akkaVersion, - "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, - "com.monovore" %% "decline" % declineVersion, - "com.beachape" %% "enumeratum" % enumeratumVersion + "org.apache.pekko" %% "pekko-actor" % pekkoVersion, + "org.apache.pekko" %% "pekko-stream" % pekkoVersion, + "org.apache.pekko" %% "pekko-slf4j" % pekkoVersion, + "com.monovore" %% "decline" % declineVersion, + "com.beachape" %% "enumeratum" % enumeratumVersion ) ) .dependsOn(core) @@ -181,10 +185,10 @@ lazy val coreS3 = project librarySettings, name := s"$baseName-s3", libraryDependencies ++= Seq( - "com.lightbend.akka" %% "akka-stream-alpakka-s3" % alpakkaVersion, - // Ordinarily this would be in Test scope however if its not then a lower version of akka-http-xml which has a + "org.apache.pekko" %% "pekko-connectors-s3" % pekkoConnectorsVersion, + // Ordinarily this would be in Test scope however if its not then a lower version of pekko-http-xml which has a // security vulnerability gets resolved in Compile scope - "com.typesafe.akka" %% "akka-http-xml" % akkaHttpVersion, + "org.apache.pekko" %% "pekko-http-xml" % pekkoHttpVersion, "org.scalatest" %% "scalatest" % scalaTestVersion % Test, "org.scalatestplus" %% scalaTestScalaCheckArtifact % scalaTestScalaCheckVersion % Test, "com.monovore" %% "decline" % declineVersion % Test @@ -198,10 +202,10 @@ lazy val coreGCS = project librarySettings, name := s"$baseName-gcs", libraryDependencies ++= Seq( - "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-storage" % alpakkaVersion, - // Ordinarily this would be in Test scope however if its not then a lower version of akka-http-spray-json which + "org.apache.pekko" %% "pekko-connectors-google-cloud-storage" % pekkoConnectorsVersion, + // Ordinarily this would be in Test scope however if its not then a lower version of pekko-http-spray-json which // has a security vulnerability gets resolved in Compile scope - "com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion, + "org.apache.pekko" %% "pekko-http-spray-json" % pekkoHttpVersion, "org.scalatest" %% "scalatest" % scalaTestVersion % Test, "org.scalatestplus" %% scalaTestScalaCheckArtifact % scalaTestScalaCheckVersion % Test ) @@ -339,18 +343,19 @@ lazy val docs = project git.remoteRepo := scmInfo.value.get.connection.replace("scm:git:", ""), paradoxGroups := Map("Language" -> Seq("Scala")), paradoxProperties ++= Map( - "akka.version" -> akkaVersion, - "akka-http.version" -> akkaHttpVersion, - "akka-streams-json.version" -> akkaStreamsJson, - "pure-config.version" -> pureConfigVersion, - "decline.version" -> declineVersion, - "scala-logging.version" -> scalaLoggingVersion, - "extref.akka.base_url" -> s"https://doc.akka.io/api/akka/${binaryVersion(akkaVersion)}/%s", - "extref.akka-docs.base_url" -> s"https://doc.akka.io/docs/akka/${binaryVersion(akkaVersion)}/%s", - "extref.akka-stream-json.base_url" -> s"https://github.com/mdedetrich/akka-streams-json", - "extref.alpakka.base_url" -> s"https://doc.akka.io/api/alpakka/${binaryVersion(alpakkaVersion)}/%s", - "extref.alpakka-docs.base_url" -> s"https://docs.akka.io/docs/alpakka/${binaryVersion(alpakkaVersion)}/%s", - "extref.alpakka-kafka-docs.base_url" -> s"https://docs.akka.io/docs/alpakka-kafka/${binaryVersion(alpakkaVersion)}/%s", + "pekko.version" -> pekkoVersion, + "pekko-http.version" -> pekkoHttpVersion, + "pekko-stream-circe.version" -> pekkoStreamCirceVersion, + "pure-config.version" -> pureConfigVersion, + "decline.version" -> declineVersion, + "scala-logging.version" -> scalaLoggingVersion, + // TODO: Replace current with binaryVersion(pekkoVersion) when pekko is released + "extref.pekko.base_url" -> s"https://pekko.apache.org/api/pekko/current/%s", + "extref.pekko-docs.base_url" -> s"https://pekko.apache.org/docs/pekko/current/%s", + "extref.pekko-stream-circe.base_url" -> s"https://github.com/mdedetrich/pekko-streams-circe", + "extref.pekko-connectors.base_url" -> s"https://pekko.apache.org/api/pekko-connectors/current/%s", + "extref.pekko-connectors-docs.base_url" -> s"https://pekko.apache.org/docs/pekko-connectors/current/%s", + "extref.pekko-connectors-kafka-docs.base_url" -> s"https://pekko.apache.org/docs/pekko-connectors-kafka/current/%s", "extref.kafka-docs.base_url" -> s"https://kafka.apache.org/${binaryVersion(kafkaClientsVersion).replace(".", "")}/%s", "extref.pureconfig.base_url" -> s"https://pureconfig.github.io/docs/", "extref.scalatest.base_url" -> s"https://www.scalatest.org/scaladoc/$scalaTestVersion/org/scalatest/%s", @@ -436,11 +441,11 @@ ThisBuild / githubWorkflowBuild := Seq( List("clean", "coverage", "test"), name = Some("Build project"), env = Map( - "ALPAKKA_S3_REGION_PROVIDER" -> "static", - "ALPAKKA_S3_REGION_DEFAULT_REGION" -> "us-west-2", - "ALPAKKA_S3_AWS_CREDENTIALS_PROVIDER" -> "static", - "ALPAKKA_S3_AWS_CREDENTIALS_ACCESS_KEY_ID" -> "${{ secrets.AWS_ACCESS_KEY }}", - "ALPAKKA_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY" -> "${{ secrets.AWS_SECRET_KEY }}" + "PEKKO_CONNECTORS_S3_REGION_PROVIDER" -> "static", + "PEKKO_CONNECTORS_S3_REGION_DEFAULT_REGION" -> "us-west-2", + "PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_PROVIDER" -> "static", + "PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_ACCESS_KEY_ID" -> "${{ secrets.AWS_ACCESS_KEY }}", + "PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY" -> "${{ secrets.AWS_SECRET_KEY }}" ) ), WorkflowStep.Sbt(List("docs/makeSite"), name = Some("Compile docs")) diff --git a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/App.scala b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/App.scala index 651175aa..9025fb3d 100644 --- a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/App.scala +++ b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/App.scala @@ -1,18 +1,20 @@ package io.aiven.guardian.kafka.backup -import akka.Done -import akka.actor.ActorSystem -import akka.kafka.scaladsl.Consumer -import akka.stream.ActorAttributes -import akka.stream.Supervision import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.backup.BackupClientInterface import io.aiven.guardian.kafka.backup.KafkaConsumer import io.aiven.guardian.kafka.backup.KafkaConsumerInterface +import org.apache.pekko import scala.concurrent.ExecutionContext import scala.concurrent.Future +import pekko.Done +import pekko.actor.ActorSystem +import pekko.kafka.scaladsl.Consumer +import pekko.stream.ActorAttributes +import pekko.stream.Supervision + trait App[T <: KafkaConsumerInterface] extends LazyLogging { implicit val kafkaClient: T implicit val backupClient: BackupClientInterface[KafkaConsumer] diff --git a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupApp.scala b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupApp.scala index 0f0ea331..4c8d18fb 100644 --- a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupApp.scala +++ b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupApp.scala @@ -1,10 +1,10 @@ package io.aiven.guardian.kafka.backup -import io.aiven.guardian.cli.AkkaSettings +import io.aiven.guardian.cli.PekkoSettings import io.aiven.guardian.kafka.backup.KafkaConsumer import io.aiven.guardian.kafka.backup.{Config => BackupConfig} import io.aiven.guardian.kafka.{Config => KafkaConfig} -trait BackupApp extends BackupConfig with KafkaConfig with AkkaSettings { +trait BackupApp extends BackupConfig with KafkaConfig with PekkoSettings { implicit lazy val kafkaClient: KafkaConsumer = new KafkaConsumer() } diff --git a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala index 992da8cc..9c2984dc 100644 --- a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala +++ b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/Main.scala @@ -1,7 +1,5 @@ package io.aiven.guardian.kafka.backup -import akka.kafka.ConsumerSettings -import akka.stream.RestartSettings import cats.implicits._ import com.monovore.decline._ import io.aiven.guardian.cli.MainUtils @@ -12,6 +10,7 @@ import io.aiven.guardian.kafka.backup.configs._ import io.aiven.guardian.kafka.configs.KafkaCluster import io.aiven.guardian.kafka.models.Gzip import io.aiven.guardian.kafka.s3.configs.S3 +import org.apache.pekko import org.slf4j.LoggerFactory import pureconfig.ConfigSource @@ -25,6 +24,9 @@ import java.time.temporal.ChronoUnit import java.util.Properties import java.util.concurrent.atomic.AtomicReference +import pekko.kafka.ConsumerSettings +import pekko.stream.RestartSettings + class Entry(val initializedApp: AtomicReference[Option[(App[_], Promise[Unit])]] = new AtomicReference(None)) extends CommandApp( name = "guardian-backup", @@ -120,7 +122,7 @@ class Entry(val initializedApp: AtomicReference[Option[(App[_], Promise[Unit])]] block.withBootstrapServers(value.toList.mkString(",")) Some(block).validNel - case None if Options.checkConfigKeyIsDefined("akka.kafka.consumer.kafka-clients.bootstrap.servers") => + case None if Options.checkConfigKeyIsDefined("pekko.kafka.consumer.kafka-clients.bootstrap.servers") => None.validNel case _ => "bootstrap-servers is a mandatory value that needs to be configured".invalidNel } diff --git a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/S3App.scala b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/S3App.scala index 1ffc2bd1..d63d87fc 100644 --- a/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/S3App.scala +++ b/cli-backup/src/main/scala/io/aiven/guardian/kafka/backup/S3App.scala @@ -1,9 +1,11 @@ package io.aiven.guardian.kafka.backup -import akka.stream.alpakka.s3.S3Settings import io.aiven.guardian.kafka.backup.KafkaConsumer import io.aiven.guardian.kafka.backup.s3.BackupClient import io.aiven.guardian.kafka.s3.{Config => S3Config} +import org.apache.pekko + +import pekko.stream.connectors.s3.S3Settings trait S3App extends S3Config with BackupApp with App[KafkaConsumer] { lazy val s3Settings: S3Settings = S3Settings() diff --git a/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala b/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala index 542de7c6..81ee9778 100644 --- a/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala +++ b/cli-backup/src/test/scala/io/aiven/guardian/kafka/backup/CliSpec.scala @@ -1,7 +1,5 @@ package io.aiven.guardian.kafka.backup -import akka.actor.ActorSystem -import akka.testkit.TestKit import com.typesafe.scalalogging.StrictLogging import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice import io.aiven.guardian.kafka.backup.configs.Compression @@ -9,6 +7,7 @@ import io.aiven.guardian.kafka.backup.configs.{Backup => BackupConfig} import io.aiven.guardian.kafka.configs.{KafkaCluster => KafkaClusterConfig} import io.aiven.guardian.kafka.models.Gzip import markatta.futiles.CancellableFuture +import org.apache.pekko import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.must.Matchers import org.scalatest.propspec.AnyPropSpecLike @@ -23,6 +22,9 @@ import scala.language.postfixOps import java.time.temporal.ChronoUnit import java.util.concurrent.TimeUnit +import pekko.actor.ActorSystem +import pekko.testkit.TestKit + @nowarn("msg=method main in class CommandApp is deprecated") class CliSpec extends TestKit(ActorSystem("BackupCliSpec")) @@ -66,7 +68,7 @@ class CliSpec def checkUntilMainInitialized(main: io.aiven.guardian.kafka.backup.Entry): Future[(App[_], Promise[Unit])] = main.initializedApp.get() match { case Some((app, promise)) => Future.successful((app, promise)) - case None => akka.pattern.after(100 millis)(checkUntilMainInitialized(main)) + case None => pekko.pattern.after(100 millis)(checkUntilMainInitialized(main)) } val (app, promise) = checkUntilMainInitialized(Main).futureValue diff --git a/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/App.scala b/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/App.scala index 6fd5880f..12182647 100644 --- a/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/App.scala +++ b/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/App.scala @@ -1,17 +1,19 @@ package io.aiven.guardian.kafka.restore -import akka.Done -import akka.actor.ActorSystem -import akka.stream.ActorAttributes -import akka.stream.KillSwitch -import akka.stream.Supervision -import akka.stream.UniqueKillSwitch import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.restore.KafkaProducer import io.aiven.guardian.kafka.restore.s3.RestoreClient +import org.apache.pekko import scala.concurrent.Future +import pekko.Done +import pekko.actor.ActorSystem +import pekko.stream.ActorAttributes +import pekko.stream.KillSwitch +import pekko.stream.Supervision +import pekko.stream.UniqueKillSwitch + trait App extends LazyLogging { implicit val kafkaProducer: KafkaProducer implicit val restoreClient: RestoreClient[KafkaProducer] diff --git a/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/Main.scala b/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/Main.scala index 795bccca..4b930712 100644 --- a/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/Main.scala +++ b/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/Main.scala @@ -1,7 +1,5 @@ package io.aiven.guardian.kafka.restore -import akka.kafka.ProducerSettings -import akka.stream.RestartSettings import cats.data.ValidatedNel import com.monovore.decline._ import com.monovore.decline.time._ @@ -13,6 +11,7 @@ import io.aiven.guardian.kafka.configs.KafkaCluster import io.aiven.guardian.kafka.restore.configs.Restore import io.aiven.guardian.kafka.s3.configs.S3 import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.pekko import org.slf4j.LoggerFactory import pureconfig.ConfigSource @@ -25,6 +24,9 @@ import java.time.OffsetDateTime import java.util.Properties import java.util.concurrent.atomic.AtomicReference +import pekko.kafka.ProducerSettings +import pekko.stream.RestartSettings + class Entry(val initializedApp: AtomicReference[Option[App]] = new AtomicReference(None)) extends CommandApp( name = "guardian-restore", @@ -92,7 +94,7 @@ class Entry(val initializedApp: AtomicReference[Option[App]] = new AtomicReferen Some(block).validNel case None - if Options.checkConfigKeyIsDefined("akka.kafka.producer.kafka-clients.bootstrap.servers") || Options + if Options.checkConfigKeyIsDefined("pekko.kafka.producer.kafka-clients.bootstrap.servers") || Options .checkConfigKeyIsDefined("kafka-client.bootstrap.servers") => None.validNel case _ => "bootstrap-servers is a mandatory value that needs to be configured".invalidNel diff --git a/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreApp.scala b/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreApp.scala index f1f084c3..20eaf771 100644 --- a/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreApp.scala +++ b/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreApp.scala @@ -1,10 +1,10 @@ package io.aiven.guardian.kafka.restore -import io.aiven.guardian.cli.AkkaSettings +import io.aiven.guardian.cli.PekkoSettings import io.aiven.guardian.kafka.restore.KafkaProducer import io.aiven.guardian.kafka.restore.{Config => RestoreConfig} import io.aiven.guardian.kafka.{Config => KafkaConfig} -trait RestoreApp extends RestoreConfig with KafkaConfig with AkkaSettings { +trait RestoreApp extends RestoreConfig with KafkaConfig with PekkoSettings { implicit lazy val kafkaProducer: KafkaProducer = new KafkaProducer() } diff --git a/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/S3App.scala b/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/S3App.scala index bb427762..fe5fa5ca 100644 --- a/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/S3App.scala +++ b/cli-restore/src/main/scala/io/aiven/guardian/kafka/restore/S3App.scala @@ -1,12 +1,14 @@ package io.aiven.guardian.kafka.restore -import akka.stream.ActorAttributes -import akka.stream.Attributes -import akka.stream.Supervision -import akka.stream.alpakka.s3.S3Settings import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.restore.s3.RestoreClient import io.aiven.guardian.kafka.s3.{Config => S3Config} +import org.apache.pekko + +import pekko.stream.ActorAttributes +import pekko.stream.Attributes +import pekko.stream.Supervision +import pekko.stream.connectors.s3.S3Settings trait S3App extends S3Config with RestoreApp with App with LazyLogging { lazy val s3Settings: S3Settings = S3Settings() diff --git a/compaction-gcs/src/main/scala/io/aiven/guardian/kafka/compaction/gcs/StorageClient.scala b/compaction-gcs/src/main/scala/io/aiven/guardian/kafka/compaction/gcs/StorageClient.scala index d464862d..2ab79ed8 100644 --- a/compaction-gcs/src/main/scala/io/aiven/guardian/kafka/compaction/gcs/StorageClient.scala +++ b/compaction-gcs/src/main/scala/io/aiven/guardian/kafka/compaction/gcs/StorageClient.scala @@ -1,16 +1,18 @@ package io.aiven.guardian.kafka.compaction.gcs -import akka.NotUsed -import akka.stream.alpakka.googlecloud.storage.scaladsl.GCStorage -import akka.stream.scaladsl.Source import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.compaction.StorageInterface import io.aiven.guardian.kafka.compaction.gcs.models.StorageConfig import io.aiven.guardian.kafka.gcs.errors.GCSErrors import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.pekko import scala.annotation.nowarn +import pekko.NotUsed +import pekko.stream.connectors.googlecloud.storage.scaladsl.GCStorage +import pekko.stream.scaladsl.Source + class StorageClient(bucketName: String, maybePrefix: Option[String])(implicit storageConfig: StorageConfig) extends StorageInterface with LazyLogging { diff --git a/compaction-s3/src/main/scala/io/aiven/guardian/kafka/compaction/s3/StorageClient.scala b/compaction-s3/src/main/scala/io/aiven/guardian/kafka/compaction/s3/StorageClient.scala index 71b2b816..9e7ae741 100644 --- a/compaction-s3/src/main/scala/io/aiven/guardian/kafka/compaction/s3/StorageClient.scala +++ b/compaction-s3/src/main/scala/io/aiven/guardian/kafka/compaction/s3/StorageClient.scala @@ -1,18 +1,20 @@ package io.aiven.guardian.kafka.compaction.s3 -import akka.NotUsed -import akka.stream.alpakka.s3.BucketAccess -import akka.stream.alpakka.s3.S3Headers -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Source import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.compaction.StorageInterface import io.aiven.guardian.kafka.compaction.s3.models.StorageConfig import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.s3.errors.S3Errors +import org.apache.pekko import scala.annotation.nowarn +import pekko.NotUsed +import pekko.stream.connectors.s3.BucketAccess +import pekko.stream.connectors.s3.S3Headers +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl.Source + class StorageClient(bucketName: String, prefix: Option[String], s3Headers: S3Headers)(implicit storageConfig: StorageConfig ) extends StorageInterface diff --git a/core-backup/src/main/resources/reference.conf b/core-backup/src/main/resources/reference.conf index 29af6267..b8ec97e1 100644 --- a/core-backup/src/main/resources/reference.conf +++ b/core-backup/src/main/resources/reference.conf @@ -1,36 +1,36 @@ -akka.kafka.consumer = { - poll-interval = ${?AKKA_KAFKA_CONSUMER_POLL_INTERVAL} - poll-timeout = ${?AKKA_KAFKA_CONSUMER_POLL_TIMEOUT} - stop-timeout = ${?AKKA_KAFKA_CONSUMER_STOP_TIMEOUT} - close-timeout = ${?AKKA_KAFKA_CONSUMER_CLOSE_TIMEOUT} - commit-time-warning = ${?AKKA_KAFKA_CONSUMER_COMMIT_TIME_WARNING} - commit-refresh-interval = ${?AKKA_KAFKA_CONSUMER_COMMIT_REFRESH_INTERVAL} - use-dispatcher = ${?AKKA_KAFKA_CONSUMER_USE_DISPATCHER} - wait-close-partition = ${?AKKA_KAFKA_CONSUMER_WAIT_CLOSE_PARTITION} - position-timeout = ${?AKKA_KAFKA_CONSUMER_POSITION_TIMEOUT} - offset-for-times-timeout = ${?AKKA_KAFKA_OFFSET_FOR_TIMES_TIMEOUT} - metadata-request-timeout = ${?AKKA_KAFKA_METADATA_REQUEST_TIMEOUT} - eos-draining-check-interval = ${?AKKA_KAFKA_CONSUMER_EOS_DRAINING_CHECK_INTERVAL} +pekko.kafka.consumer = { + poll-interval = ${?PEKKO_KAFKA_CONSUMER_POLL_INTERVAL} + poll-timeout = ${?PEKKO_KAFKA_CONSUMER_POLL_TIMEOUT} + stop-timeout = ${?PEKKO_KAFKA_CONSUMER_STOP_TIMEOUT} + close-timeout = ${?PEKKO_KAFKA_CONSUMER_CLOSE_TIMEOUT} + commit-time-warning = ${?PEKKO_KAFKA_CONSUMER_COMMIT_TIME_WARNING} + commit-refresh-interval = ${?PEKKO_KAFKA_CONSUMER_COMMIT_REFRESH_INTERVAL} + use-dispatcher = ${?PEKKO_KAFKA_CONSUMER_USE_DISPATCHER} + wait-close-partition = ${?PEKKO_KAFKA_CONSUMER_WAIT_CLOSE_PARTITION} + position-timeout = ${?PEKKO_KAFKA_CONSUMER_POSITION_TIMEOUT} + offset-for-times-timeout = ${?PEKKO_KAFKA_OFFSET_FOR_TIMES_TIMEOUT} + metadata-request-timeout = ${?PEKKO_KAFKA_METADATA_REQUEST_TIMEOUT} + eos-draining-check-interval = ${?PEKKO_KAFKA_CONSUMER_EOS_DRAINING_CHECK_INTERVAL} connection-checker = { - enable = ${?AKKA_KAFKA_CONSUMER_CONNECTION_CHECKER_ENABLE} - max-retries = ${?AKKA_KAFKA_CONSUMER_CONNECTION_CHECKER_MAX_RETRIES} - backoff-factor = ${?AKKA_KAFKA_CONSUMER_CONNECTION_CHECKER_BACKOFF_FACTOR} - check-interval = ${?AKKA_KAFKA_CONSUMER_CONNECTION_CHECKER_CHECK_INTERVAL} + enable = ${?PEKKO_KAFKA_CONSUMER_CONNECTION_CHECKER_ENABLE} + max-retries = ${?PEKKO_KAFKA_CONSUMER_CONNECTION_CHECKER_MAX_RETRIES} + backoff-factor = ${?PEKKO_KAFKA_CONSUMER_CONNECTION_CHECKER_BACKOFF_FACTOR} + check-interval = ${?PEKKO_KAFKA_CONSUMER_CONNECTION_CHECKER_CHECK_INTERVAL} } - partition-handler-warning = ${?AKKA_KAFKA_CONSUMER_PARTITION_HANDLER_WARNING} + partition-handler-warning = ${?PEKKO_KAFKA_CONSUMER_PARTITION_HANDLER_WARNING} offset-reset-protection = { - enable = ${?AKKA_KAFKA_CONSUMER_OFFSET_RESET_PROTECTION_ENABLE} - offset-threshold = ${?AKKA_KAFKA_CONSUMER_OFFSET_RESET_PROTECTION_OFFSET_THRESHOLD} - time-threshold = ${?AKKA_KAFKA_CONSUMER_OFFSET_RESET_PROTECTION_TIME_THRESHOLD} + enable = ${?PEKKO_KAFKA_CONSUMER_OFFSET_RESET_PROTECTION_ENABLE} + offset-threshold = ${?PEKKO_KAFKA_CONSUMER_OFFSET_RESET_PROTECTION_OFFSET_THRESHOLD} + time-threshold = ${?PEKKO_KAFKA_CONSUMER_OFFSET_RESET_PROTECTION_TIME_THRESHOLD} } } -akka.kafka.committer = { +pekko.kafka.committer = { max-batch = 100000 - max-batch = ${?AKKA_KAFKA_COMMITTER_MAX_BATCH} + max-batch = ${?PEKKO_KAFKA_COMMITTER_MAX_BATCH} max-interval = 1 hour - max-interval = ${?AKKA_KAFKA_COMMITTER_MAX_INTERVAL} - parallelism = ${?AKKA_KAFKA_COMMITTER_PARALLELISM} + max-interval = ${?PEKKO_KAFKA_COMMITTER_MAX_INTERVAL} + parallelism = ${?PEKKO_KAFKA_COMMITTER_PARALLELISM} parallelism = 10000 } diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala index 77323011..47e330af 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/BackupClientInterface.scala @@ -1,10 +1,5 @@ package io.aiven.guardian.kafka.backup -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.SubstreamCancelStrategy -import akka.stream.scaladsl._ -import akka.util.ByteString import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.Errors import io.aiven.guardian.kafka.backup.configs.{Compression => CompressionConfig, _} @@ -14,6 +9,7 @@ import io.aiven.guardian.kafka.models.CompressionType import io.aiven.guardian.kafka.models.Gzip import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.circe.syntax._ +import org.apache.pekko import scala.annotation.nowarn import scala.concurrent.ExecutionContext @@ -25,6 +21,12 @@ import java.time._ import java.time.format.DateTimeFormatter import java.time.temporal._ +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.SubstreamCancelStrategy +import pekko.stream.scaladsl._ +import pekko.util.ByteString + /** An interface for a template on how to backup a Kafka Stream into some data storage * @tparam T * The underlying `kafkaClientInterface` type @@ -78,32 +80,32 @@ trait BackupClientInterface[T <: KafkaConsumerInterface] extends LazyLogging { def getCurrentUploadState(key: String): Future[UploadStateResult] /** A sink that is executed whenever a previously existing Backup needs to be terminated and closed. Generally - * speaking this [[akka.stream.scaladsl.Sink]] is similar to the `backupToStorageSink` except that + * speaking this [[pekko.stream.scaladsl.Sink]] is similar to the `backupToStorageSink` except that * `kafkaClientInterface.CursorContext` is not required since no Kafka messages are being written. * * Note that the terminate refers to the fact that this Sink is executed with a `null]` - * [[akka.stream.scaladsl.Source]] which when written to an already existing unfinished backup terminates the + * [[pekko.stream.scaladsl.Source]] which when written to an already existing unfinished backup terminates the * containing JSON array so that it becomes valid parsable JSON. * @param previousState * A data structure containing both the [[State]] along with the associated key which you can refer to in order to - * define your [[akka.stream.scaladsl.Sink]] + * define your [[pekko.stream.scaladsl.Sink]] * @return - * A [[akka.stream.scaladsl.Sink]] that points to an existing key defined by `previousState.previousKey` + * A [[pekko.stream.scaladsl.Sink]] that points to an existing key defined by `previousState.previousKey` */ def backupToStorageTerminateSink(previousState: PreviousState): Sink[ByteString, Future[BackupResult]] - /** Override this method to define how to backup a [[akka.util.ByteString]] combined with Kafka + /** Override this method to define how to backup a [[pekko.util.ByteString]] combined with Kafka * `kafkaClientInterface.CursorContext` to a `DataSource` * @param key * The object key or filename for what is being backed up * @param currentState * The current state if it exists. If this is empty then a new backup is being created with the associated `key` - * otherwise if this contains a [[State]] then the defined [[akka.stream.scaladsl.Sink]] needs to handle resuming a - * previously unfinished backup with that `key` by directly appending the [[akka.util.ByteString]] data. + * otherwise if this contains a [[State]] then the defined [[pekko.stream.scaladsl.Sink]] needs to handle resuming + * a previously unfinished backup with that `key` by directly appending the [[pekko.util.ByteString]] data. * @return - * A [[akka.stream.scaladsl.Sink]] that given a [[akka.util.ByteString]] (containing a single Kafka + * A [[pekko.stream.scaladsl.Sink]] that given a [[pekko.util.ByteString]] (containing a single Kafka * [[io.aiven.guardian.kafka.models.ReducedConsumerRecord]]) along with its `kafkaClientInterface.CursorContext` - * backs up the data to your data storage. The [[akka.stream.scaladsl.Sink]] is also responsible for executing + * backs up the data to your data storage. The [[pekko.stream.scaladsl.Sink]] is also responsible for executing * `kafkaClientInterface.commitCursor` when the data is successfully backed up */ def backupToStorageSink(key: String, diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/KafkaConsumer.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/KafkaConsumer.scala index ee41bd34..1fed28a5 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/KafkaConsumer.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/KafkaConsumer.scala @@ -1,17 +1,5 @@ package io.aiven.guardian.kafka.backup -import akka.Done -import akka.actor.ActorSystem -import akka.kafka.CommitDelivery -import akka.kafka.CommitterSettings -import akka.kafka.ConsumerMessage.CommittableOffset -import akka.kafka.ConsumerMessage.CommittableOffsetBatch -import akka.kafka.ConsumerSettings -import akka.kafka.Subscriptions -import akka.kafka.scaladsl.Committer -import akka.kafka.scaladsl.Consumer -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.SourceWithContext import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.backup.configs.Backup import io.aiven.guardian.kafka.backup.configs.ChronoUnitSlice @@ -21,6 +9,7 @@ import io.aiven.guardian.kafka.models.ReducedConsumerRecord import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.pekko import scala.collection.immutable import scala.concurrent.Future @@ -28,9 +17,23 @@ import scala.jdk.DurationConverters._ import java.util.Base64 -/** A Kafka Client that uses Alpakka Kafka Consumer under the hood to create a stream of events from a Kafka cluster. To - * configure the Alpakka Kafka Consumer use the standard typesafe configuration i.e. akka.kafka.consumer (note that the - * `keySerializer` and `valueSerializer` are hardcoded so you cannot override this). +import pekko.Done +import pekko.actor.ActorSystem +import pekko.kafka.CommitDelivery +import pekko.kafka.CommitterSettings +import pekko.kafka.ConsumerMessage.CommittableOffset +import pekko.kafka.ConsumerMessage.CommittableOffsetBatch +import pekko.kafka.ConsumerSettings +import pekko.kafka.Subscriptions +import pekko.kafka.scaladsl.Committer +import pekko.kafka.scaladsl.Consumer +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.SourceWithContext + +/** A Kafka Client that uses Pekko Connectors Kafka Consumer under the hood to create a stream of events from a Kafka + * cluster. To configure the Pekko Connectors Kafka Consumer use the standard typesafe configuration i.e. + * pekko.kafka.consumer (note that the `keySerializer` and `valueSerializer` are hardcoded so you cannot override + * this). * @param configureConsumer * A way to configure the underlying Kafka consumer settings * @param configureCommitter @@ -105,7 +108,7 @@ class KafkaConsumer( /** @return * The result of this function gets directly passed into the `combine` parameter of - * [[akka.stream.scaladsl.Source.toMat]] + * [[pekko.stream.scaladsl.Source.toMat]] */ override def matCombine: (Consumer.Control, Future[Done]) => Consumer.DrainingControl[Done] = Consumer.DrainingControl[Done].apply diff --git a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/KafkaConsumerInterface.scala b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/KafkaConsumerInterface.scala index 5f25a529..11a86188 100644 --- a/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/KafkaConsumerInterface.scala +++ b/core-backup/src/main/scala/io/aiven/guardian/kafka/backup/KafkaConsumerInterface.scala @@ -1,13 +1,15 @@ package io.aiven.guardian.kafka.backup -import akka.Done -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.SourceWithContext import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.pekko import scala.collection.immutable import scala.concurrent.Future +import pekko.Done +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.SourceWithContext + trait KafkaConsumerInterface { /** The type of the context to pass around. In context of a Kafka consumer, this typically holds offset data to be @@ -20,7 +22,7 @@ trait KafkaConsumerInterface { type Control /** The type that represents the result of the `combine` parameter that is supplied to - * [[akka.stream.scaladsl.Source.toMat]] + * [[pekko.stream.scaladsl.Source.toMat]] */ type MatCombineResult @@ -40,7 +42,7 @@ trait KafkaConsumerInterface { /** @return * The result of this function gets directly passed into the `combine` parameter of - * [[akka.stream.scaladsl.Source.toMat]] + * [[pekko.stream.scaladsl.Source.toMat]] */ def matCombine: (Control, Future[Done]) => MatCombineResult diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientControlWrapper.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientControlWrapper.scala index 3a0daa42..238b05b4 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientControlWrapper.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientControlWrapper.scala @@ -1,12 +1,14 @@ package io.aiven.guardian.kafka.backup -import akka.Done -import akka.kafka.scaladsl.Consumer -import akka.stream.Materializer +import org.apache.pekko import scala.concurrent.ExecutionContext import scala.concurrent.Future +import pekko.Done +import pekko.kafka.scaladsl.Consumer +import pekko.stream.Materializer + /** A wrapper that is designed to make it easier to cleanly shutdown resources in tests */ class BackupClientControlWrapper[T <: KafkaConsumer](backupClient: BackupClientInterface[T])(implicit diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala index a555d87a..bb4bfa3d 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceSpec.scala @@ -1,8 +1,8 @@ package io.aiven.guardian.kafka.backup -import akka.actor.ActorSystem -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.pekko.AnyPropTestKit +import org.apache.pekko.actor.ActorSystem class BackupClientInterfaceSpec extends AnyPropTestKit(ActorSystem("BackupClientInterfaceSpec")) diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceTest.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceTest.scala index 9e3ebd53..9d44c98b 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceTest.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/BackupClientInterfaceTest.scala @@ -1,13 +1,8 @@ package io.aiven.guardian.kafka.backup -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.util.ByteString import com.softwaremill.diffx.generic.auto._ import com.softwaremill.diffx.scalatest.DiffMustMatcher._ import com.typesafe.scalalogging.StrictLogging -import io.aiven.guardian.akka.AkkaStreamTestKit import io.aiven.guardian.kafka.Generators.KafkaDataWithTimePeriod import io.aiven.guardian.kafka.Generators.kafkaDataWithTimePeriodsGen import io.aiven.guardian.kafka.TestUtils.waitForStartOfTimeUnit @@ -16,8 +11,10 @@ import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst import io.aiven.guardian.kafka.backup.configs.{Compression => CompressionModel} import io.aiven.guardian.kafka.codecs.Circe._ import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import io.aiven.guardian.pekko.PekkoStreamTestKit import org.apache.kafka.common.record.TimestampType -import org.mdedetrich.akka.stream.support.CirceStreamSupport +import org.apache.pekko +import org.mdedetrich.pekko.stream.support.CirceStreamSupport import org.scalatest.Inspectors import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.must.Matchers @@ -35,11 +32,16 @@ import java.time.temporal.ChronoUnit import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.ConcurrentLinkedQueue +import pekko.stream.scaladsl.Keep +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.Source +import pekko.util.ByteString + final case class Periods(periodsBefore: Long, periodsAfter: Long) trait BackupClientInterfaceTest extends AnyPropSpecLike - with AkkaStreamTestKit + with PekkoStreamTestKit with Matchers with ScalaFutures with ScalaCheckPropertyChecks @@ -137,7 +139,7 @@ trait BackupClientInterfaceTest mock.clear() val calculatedFuture = for { _ <- mock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + _ <- pekko.pattern.after(PekkoStreamInitializationConstant)(Future.successful(())) processedRecords = mock.mergeBackedUpData() asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => Source @@ -178,7 +180,7 @@ trait BackupClientInterfaceTest mock.clear() val calculatedFuture = for { _ <- mock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + _ <- pekko.pattern.after(PekkoStreamInitializationConstant)(Future.successful(())) processedRecords = mock.mergeBackedUpData(compression = compression.map(_.`type`)) asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => Source @@ -213,7 +215,7 @@ trait BackupClientInterfaceTest mock.clear() val calculatedFuture = for { _ <- mock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + _ <- pekko.pattern.after(PekkoStreamInitializationConstant)(Future.successful(())) processedRecords = mock.mergeBackedUpData(compression = compression.map(_.`type`)) asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => Source @@ -250,7 +252,7 @@ trait BackupClientInterfaceTest mock.clear() val calculatedFuture = for { _ <- mock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + _ <- pekko.pattern.after(PekkoStreamInitializationConstant)(Future.successful(())) processedRecords = mock.mergeBackedUpData(compression = compression.map(_.`type`)) asRecords <- Future.sequence(processedRecords.map { case (key, byteString) => Source @@ -284,7 +286,7 @@ trait BackupClientInterfaceTest mock.clear() val calculatedFuture = for { _ <- mock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + _ <- pekko.pattern.after(PekkoStreamInitializationConstant)(Future.successful(())) processedRecords = mock.mergeBackedUpData(terminate = false, compression = compression.map(_.`type`)) } yield processedRecords.splitAt(processedRecords.length - 1) @@ -335,9 +337,9 @@ trait BackupClientInterfaceTest val calculatedFuture = for { _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) _ <- mockOne.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(mockTwo.backup.run()) + _ <- pekko.pattern.after(PekkoStreamInitializationConstant)(mockTwo.backup.run()) processedRecords <- - akka.pattern.after(AkkaStreamInitializationConstant)( + pekko.pattern.after(PekkoStreamInitializationConstant)( Future.successful( mockTwo.mergeBackedUpData() ) diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/CompressionSpec.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/CompressionSpec.scala index 1412f6a5..82dfc250 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/CompressionSpec.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/CompressionSpec.scala @@ -1,14 +1,10 @@ package io.aiven.guardian.kafka.backup -import akka.actor.ActorSystem -import akka.stream.scaladsl.Compression -import akka.stream.scaladsl.Source -import akka.stream.scaladsl.SourceWithContext -import akka.util.ByteString -import io.aiven.guardian.akka.AkkaStreamTestKit -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.backup.configs.{Compression => CompressionModel} import io.aiven.guardian.kafka.models.Gzip +import io.aiven.guardian.pekko.AnyPropTestKit +import io.aiven.guardian.pekko.PekkoStreamTestKit +import org.apache.pekko import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.must.Matchers import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks @@ -17,16 +13,22 @@ import scala.concurrent.ExecutionContext import scala.concurrent.duration._ import scala.language.postfixOps +import pekko.actor.ActorSystem +import pekko.stream.scaladsl.Compression +import pekko.stream.scaladsl.Source +import pekko.stream.scaladsl.SourceWithContext +import pekko.util.ByteString + class CompressionSpec extends AnyPropTestKit(ActorSystem("CompressionSpec")) with Matchers with ScalaFutures with ScalaCheckPropertyChecks - with AkkaStreamTestKit { + with PekkoStreamTestKit { implicit val ec: ExecutionContext = system.dispatcher - // Due to akka-streams taking a while to initialize for the first time we need a longer + // Due to pekko-streams taking a while to initialize for the first time we need a longer // increase in the timeout implicit override val patienceConfig: PatienceConfig = PatienceConfig(10 seconds, 15 millis) diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/ConfigurationChangeRestartSpec.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/ConfigurationChangeRestartSpec.scala index d618555e..ffb0377b 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/ConfigurationChangeRestartSpec.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/ConfigurationChangeRestartSpec.scala @@ -1,15 +1,8 @@ package io.aiven.guardian.kafka.backup -import akka.actor.ActorSystem -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.util.ByteString import com.softwaremill.diffx.generic.auto._ import com.softwaremill.diffx.scalatest.DiffMustMatcher._ import com.typesafe.scalalogging.StrictLogging -import io.aiven.guardian.akka.AkkaStreamTestKit -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.Generators.KafkaDataWithTimePeriod import io.aiven.guardian.kafka.Generators.kafkaDataWithTimePeriodsGen import io.aiven.guardian.kafka.TestUtils.waitForStartOfTimeUnit @@ -19,7 +12,10 @@ import io.aiven.guardian.kafka.codecs.Circe._ import io.aiven.guardian.kafka.models.BackupObjectMetadata import io.aiven.guardian.kafka.models.Gzip import io.aiven.guardian.kafka.models.ReducedConsumerRecord -import org.mdedetrich.akka.stream.support.CirceStreamSupport +import io.aiven.guardian.pekko.AnyPropTestKit +import io.aiven.guardian.pekko.PekkoStreamTestKit +import org.apache.pekko +import org.mdedetrich.pekko.stream.support.CirceStreamSupport import org.scalatest.Inspectors import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.must.Matchers @@ -36,9 +32,15 @@ import java.time.temporal.ChronoUnit import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.ConcurrentLinkedQueue +import pekko.actor.ActorSystem +import pekko.stream.scaladsl.Keep +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.Source +import pekko.util.ByteString + class ConfigurationChangeRestartSpec extends AnyPropTestKit(ActorSystem("ConfigurationChangeSpec")) - with AkkaStreamTestKit + with PekkoStreamTestKit with Matchers with ScalaFutures with ScalaCheckPropertyChecks @@ -81,14 +83,14 @@ class ConfigurationChangeRestartSpec val calculatedFuture = for { _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) _ <- mockOne.backup.run() - keysWithGzip <- akka.pattern.after(AkkaStreamInitializationConstant)( + keysWithGzip <- pekko.pattern.after(PekkoStreamInitializationConstant)( Future.successful( backupStorage.asScala.map { case (key, _) => key }.toSet ) ) _ <- mockTwo.backup.run() keysWithoutGzip <- - akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful { + pekko.pattern.after(PekkoStreamInitializationConstant)(Future.successful { val allKeys = backupStorage.asScala.map { case (key, _) => key }.toSet allKeys diff keysWithGzip }) @@ -144,14 +146,14 @@ class ConfigurationChangeRestartSpec val calculatedFuture = for { _ <- waitForStartOfTimeUnit(ChronoUnit.MINUTES) _ <- mockOne.backup.run() - keysWithoutGzip <- akka.pattern.after(AkkaStreamInitializationConstant)( + keysWithoutGzip <- pekko.pattern.after(PekkoStreamInitializationConstant)( Future.successful( backupStorage.asScala.map { case (key, _) => key }.toSet ) ) _ <- mockTwo.backup.run() keysWithGzip <- - akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful { + pekko.pattern.after(PekkoStreamInitializationConstant)(Future.successful { val allKeys = backupStorage.asScala.map { case (key, _) => key }.toSet allKeys diff keysWithoutGzip }) diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/GzipCompressionBackupClientInterfaceSpec.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/GzipCompressionBackupClientInterfaceSpec.scala index 9df2854c..3c8139e8 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/GzipCompressionBackupClientInterfaceSpec.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/GzipCompressionBackupClientInterfaceSpec.scala @@ -1,9 +1,9 @@ package io.aiven.guardian.kafka.backup -import akka.actor.ActorSystem -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.backup.configs.Compression import io.aiven.guardian.kafka.models.Gzip +import io.aiven.guardian.pekko.AnyPropTestKit +import org.apache.pekko.actor.ActorSystem class GzipCompressionBackupClientInterfaceSpec extends AnyPropTestKit(ActorSystem("GzipCompressionBackupClientInterfaceSpec")) diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala index e458df03..fd92de26 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedBackupClientInterface.scala @@ -1,14 +1,5 @@ package io.aiven.guardian.kafka.backup -import akka.Done -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.scaladsl.Compression -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.util.ByteString import io.aiven.guardian.kafka.TestUtils._ import io.aiven.guardian.kafka.Utils import io.aiven.guardian.kafka.backup.configs.Backup @@ -18,6 +9,7 @@ import io.aiven.guardian.kafka.models.BackupObjectMetadata import io.aiven.guardian.kafka.models.CompressionType import io.aiven.guardian.kafka.models.Gzip import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.pekko import scala.collection.immutable import scala.concurrent.Await @@ -29,6 +21,16 @@ import scala.language.postfixOps import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.ConcurrentLinkedQueue +import pekko.Done +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.scaladsl.Compression +import pekko.stream.scaladsl.Flow +import pekko.stream.scaladsl.Keep +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.Source +import pekko.util.ByteString + /** A mocked `BackupClientInterface` which given a `kafkaClientInterface` allows you to * * @param kafkaClientInterface diff --git a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedKafkaConsumerInterface.scala b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedKafkaConsumerInterface.scala index e4d840a0..978f5063 100644 --- a/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedKafkaConsumerInterface.scala +++ b/core-backup/src/test/scala/io/aiven/guardian/kafka/backup/MockedKafkaConsumerInterface.scala @@ -1,9 +1,7 @@ package io.aiven.guardian.kafka.backup -import akka.Done -import akka.NotUsed -import akka.stream.scaladsl._ import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.pekko import scala.collection.immutable import scala.concurrent.Future @@ -14,6 +12,10 @@ import java.time.temporal.ChronoUnit import java.util.concurrent.ConcurrentLinkedDeque import java.util.concurrent.atomic.AtomicReference +import pekko.Done +import pekko.NotUsed +import pekko.stream.scaladsl._ + /** A mocked `KafkaClientInterface` that returns a specific data as its source * * @param kafkaData @@ -45,7 +47,7 @@ class MockedKafkaConsumerInterface(kafkaData: Source[ReducedConsumerRecord, NotU override type Control = Future[NotUsed] /** The type that represents the result of the `combine` parameter that is supplied to - * [[akka.stream.scaladsl.Source.toMat]] + * [[pekko.stream.scaladsl.Source.toMat]] */ override type MatCombineResult = Future[NotUsed] @@ -100,7 +102,7 @@ class MockedKafkaConsumerInterface(kafkaData: Source[ReducedConsumerRecord, NotU /** @return * The result of this function gets directly passed into the `combine` parameter of - * [[akka.stream.scaladsl.Source.toMat]] + * [[pekko.stream.scaladsl.Source.toMat]] */ override def matCombine: (Future[NotUsed], Future[Done]) => Future[NotUsed] = Keep.left diff --git a/core-cli/src/main/resources/application.conf b/core-cli/src/main/resources/application.conf index 6836871a..048a6d0c 100644 --- a/core-cli/src/main/resources/application.conf +++ b/core-cli/src/main/resources/application.conf @@ -1,5 +1,5 @@ -akka { - loggers = ["akka.event.slf4j.Slf4jLogger"] +pekko { + loggers = ["org.apache.pekko.event.slf4j.Slf4jLogger"] loglevel = "INFO" - logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + logging-filter = "org.apache.pekko.event.slf4j.Slf4jLoggingFilter" } diff --git a/core-cli/src/main/scala/io/aiven/guardian/cli/MainUtils.scala b/core-cli/src/main/scala/io/aiven/guardian/cli/MainUtils.scala index b3f6de1a..4ab8694b 100644 --- a/core-cli/src/main/scala/io/aiven/guardian/cli/MainUtils.scala +++ b/core-cli/src/main/scala/io/aiven/guardian/cli/MainUtils.scala @@ -20,7 +20,7 @@ object MainUtils { /** Hook that lets the user specify the future that will signal the shutdown of the server whenever completed. Adapted * from - * https://github.com/akka/akka-http/blob/main/akka-http/src/main/scala/akka/http/scaladsl/server/HttpApp.scala#L151-L163 + * https://github.com/apache/incubator-pekko-http/blob/94d1b1c153cc39216dae4217fd0e927f04d53cd2/http/src/main/scala/org/apache/pekko/http/scaladsl/server/HttpApp.scala#L164-L176 */ @SuppressWarnings( Array( diff --git a/core-cli/src/main/scala/io/aiven/guardian/cli/AkkaSettings.scala b/core-cli/src/main/scala/io/aiven/guardian/cli/PekkoSettings.scala similarity index 77% rename from core-cli/src/main/scala/io/aiven/guardian/cli/AkkaSettings.scala rename to core-cli/src/main/scala/io/aiven/guardian/cli/PekkoSettings.scala index f8e90a60..88880d77 100644 --- a/core-cli/src/main/scala/io/aiven/guardian/cli/AkkaSettings.scala +++ b/core-cli/src/main/scala/io/aiven/guardian/cli/PekkoSettings.scala @@ -1,10 +1,10 @@ package io.aiven.guardian.cli -import akka.actor.ActorSystem +import org.apache.pekko.actor.ActorSystem import scala.concurrent.ExecutionContext -trait AkkaSettings { +trait PekkoSettings { implicit val actorSystem: ActorSystem = ActorSystem() implicit val executionContext: ExecutionContext = ExecutionContext.global } diff --git a/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/DatabaseInterface.scala b/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/DatabaseInterface.scala index df14c101..f7cd8953 100644 --- a/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/DatabaseInterface.scala +++ b/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/DatabaseInterface.scala @@ -1,13 +1,15 @@ package io.aiven.guardian.kafka.compaction -import akka.NotUsed -import akka.stream.javadsl.Flow -import akka.stream.scaladsl.Source -import akka.util.ByteString import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.pekko import scala.concurrent.Future +import pekko.NotUsed +import pekko.stream.javadsl.Flow +import pekko.stream.scaladsl.Source +import pekko.util.ByteString + trait DatabaseInterface { /** Given a source of storage where Kafka messages are contained, stream it into a database. diff --git a/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/PostgresJDBCDatabase.scala b/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/PostgresJDBCDatabase.scala index 3127d47d..621531ad 100644 --- a/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/PostgresJDBCDatabase.scala +++ b/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/PostgresJDBCDatabase.scala @@ -1,13 +1,7 @@ package io.aiven.guardian.kafka.compaction -import akka.NotUsed -import akka.stream.ActorAttributes -import akka.stream.Materializer -import akka.stream.javadsl.Flow -import akka.stream.scaladsl.Source -import akka.stream.scaladsl.StreamConverters -import akka.util.ByteString import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.pekko import org.postgresql.copy.CopyManager import org.postgresql.core.BaseConnection @@ -17,6 +11,14 @@ import scala.concurrent.blocking import java.sql.Connection +import pekko.NotUsed +import pekko.stream.ActorAttributes +import pekko.stream.Materializer +import pekko.stream.javadsl.Flow +import pekko.stream.scaladsl.Source +import pekko.stream.scaladsl.StreamConverters +import pekko.util.ByteString + /** A Postgres Database backed by JDBC which uses the Postgres COPY command to insert data into the database. Note that * since this uses JDBC and CopyManager, its implementation is blocking under the hood. * @param scheduler diff --git a/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/StorageInterface.scala b/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/StorageInterface.scala index 8d03061f..d784c4f9 100644 --- a/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/StorageInterface.scala +++ b/core-compaction/src/main/scala/io/aiven/guardian/kafka/compaction/StorageInterface.scala @@ -1,8 +1,10 @@ package io.aiven.guardian.kafka.compaction -import akka.NotUsed -import akka.stream.scaladsl.Source import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.pekko + +import pekko.NotUsed +import pekko.stream.scaladsl.Source trait StorageInterface { diff --git a/core-restore/src/main/resources/reference.conf b/core-restore/src/main/resources/reference.conf index 510d4f59..9040d658 100644 --- a/core-restore/src/main/resources/reference.conf +++ b/core-restore/src/main/resources/reference.conf @@ -1,12 +1,12 @@ -akka.kafka.producer { - discovery-method = ${?AKKA_KAFKA_PRODUCER_DISCOVERY_METHOD} - service-name = ${?AKKA_KAFKA_PRODUCER_SERVICE_NAME} - resolve-timeout = ${?AKKA_KAFKA_PRODUCER_RESOLVE_TIMEOUT} - parallelism = ${?AKKA_KAFKA_PRODUCER_PARALLELISM} - close-timeout = ${?AKKA_KAFKA_PRODUCER_CLOSE_TIMEOUT} - close-on-producer-stop = ${?AKKA_KAFKA_PRODUCER_CLOSE_ON_PRODUCER_STOP} - use-dispatcher = ${?AKKA_KAFKA_PRODUCER_USE_DISPATCHER} - eos-commit-interval = ${?AKKA_KAFKA_PRODUCER_EOS_COMMIT_INTERVAL} +pekko.kafka.producer { + discovery-method = ${?PEKKO_KAFKA_PRODUCER_DISCOVERY_METHOD} + service-name = ${?PEKKO_KAFKA_PRODUCER_SERVICE_NAME} + resolve-timeout = ${?PEKKO_KAFKA_PRODUCER_RESOLVE_TIMEOUT} + parallelism = ${?PEKKO_KAFKA_PRODUCER_PARALLELISM} + close-timeout = ${?PEKKO_KAFKA_PRODUCER_CLOSE_TIMEOUT} + close-on-producer-stop = ${?PEKKO_KAFKA_PRODUCER_CLOSE_ON_PRODUCER_STOP} + use-dispatcher = ${?PEKKO_KAFKA_PRODUCER_USE_DISPATCHER} + eos-commit-interval = ${?PEKKO_KAFKA_PRODUCER_EOS_COMMIT_INTERVAL} } restore { diff --git a/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/KafkaProducer.scala b/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/KafkaProducer.scala index 896d1e40..4f5af617 100644 --- a/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/KafkaProducer.scala +++ b/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/KafkaProducer.scala @@ -1,19 +1,21 @@ package io.aiven.guardian.kafka.restore -import akka.Done -import akka.actor.ActorSystem -import akka.kafka.ProducerSettings -import akka.kafka.scaladsl.Producer -import akka.stream.scaladsl.Sink import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.restore.configs.Restore import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.pekko import scala.concurrent.Future import java.util.Base64 +import pekko.Done +import pekko.actor.ActorSystem +import pekko.kafka.ProducerSettings +import pekko.kafka.scaladsl.Producer +import pekko.stream.scaladsl.Sink + class KafkaProducer( configureProducer: Option[ ProducerSettings[Array[Byte], Array[Byte]] => ProducerSettings[Array[Byte], Array[Byte]] diff --git a/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/KafkaProducerInterface.scala b/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/KafkaProducerInterface.scala index 48f0b855..d7273747 100644 --- a/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/KafkaProducerInterface.scala +++ b/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/KafkaProducerInterface.scala @@ -1,11 +1,13 @@ package io.aiven.guardian.kafka.restore -import akka.Done -import akka.stream.scaladsl.Sink import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.pekko import scala.concurrent.Future +import pekko.Done +import pekko.stream.scaladsl.Sink + trait KafkaProducerInterface { def getSink: Sink[ReducedConsumerRecord, Future[Done]] } diff --git a/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreClientInterface.scala b/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreClientInterface.scala index 8ac57879..9fcc5878 100644 --- a/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreClientInterface.scala +++ b/core-restore/src/main/scala/io/aiven/guardian/kafka/restore/RestoreClientInterface.scala @@ -1,18 +1,5 @@ package io.aiven.guardian.kafka.restore -import akka.Done -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.Attributes -import akka.stream.KillSwitches -import akka.stream.UniqueKillSwitch -import akka.stream.scaladsl.Compression -import akka.stream.scaladsl.Concat -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.RunnableGraph -import akka.stream.scaladsl.Source -import akka.util.ByteString import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.ExtensionsMethods._ import io.aiven.guardian.kafka.Utils @@ -22,7 +9,8 @@ import io.aiven.guardian.kafka.models.BackupObjectMetadata import io.aiven.guardian.kafka.models.Gzip import io.aiven.guardian.kafka.models.ReducedConsumerRecord import io.aiven.guardian.kafka.restore.configs.Restore -import org.mdedetrich.akka.stream.support.CirceStreamSupport +import org.apache.pekko +import org.mdedetrich.pekko.stream.support.CirceStreamSupport import org.typelevel.jawn.AsyncParser import scala.concurrent.ExecutionContext @@ -30,6 +18,20 @@ import scala.concurrent.Future import java.time.OffsetDateTime +import pekko.Done +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.Attributes +import pekko.stream.KillSwitches +import pekko.stream.UniqueKillSwitch +import pekko.stream.scaladsl.Compression +import pekko.stream.scaladsl.Concat +import pekko.stream.scaladsl.Flow +import pekko.stream.scaladsl.Keep +import pekko.stream.scaladsl.RunnableGraph +import pekko.stream.scaladsl.Source +import pekko.util.ByteString + trait RestoreClientInterface[T <: KafkaProducerInterface] extends LazyLogging { implicit val kafkaProducerInterface: T implicit val restoreConfig: Restore diff --git a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/GzipCompressionRestoreClientInterfaceSpec.scala b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/GzipCompressionRestoreClientInterfaceSpec.scala index 20ea2fa0..0271e140 100644 --- a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/GzipCompressionRestoreClientInterfaceSpec.scala +++ b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/GzipCompressionRestoreClientInterfaceSpec.scala @@ -1,9 +1,9 @@ package io.aiven.guardian.kafka.restore -import akka.actor.ActorSystem -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.backup.configs.Compression import io.aiven.guardian.kafka.models.Gzip +import io.aiven.guardian.pekko.AnyPropTestKit +import org.apache.pekko.actor.ActorSystem class GzipCompressionRestoreClientInterfaceSpec extends AnyPropTestKit(ActorSystem("GzipCompressionRestoreClientInterfaceSpec")) diff --git a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/MockedKafkaProducerInterface.scala b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/MockedKafkaProducerInterface.scala index 86fc40c2..a3ac8793 100644 --- a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/MockedKafkaProducerInterface.scala +++ b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/MockedKafkaProducerInterface.scala @@ -1,12 +1,15 @@ package io.aiven.guardian.kafka.restore -import akka.Done -import akka.stream.scaladsl.Sink + import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import org.apache.pekko import scala.concurrent.Future import java.util.concurrent.ConcurrentLinkedQueue +import pekko.Done +import pekko.stream.scaladsl.Sink + class MockedKafkaProducerInterface() extends KafkaProducerInterface { val producedData: ConcurrentLinkedQueue[ReducedConsumerRecord] = new ConcurrentLinkedQueue[ReducedConsumerRecord]() diff --git a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/MockedRestoreClientInterface.scala b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/MockedRestoreClientInterface.scala index 0ad899ae..3904b122 100644 --- a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/MockedRestoreClientInterface.scala +++ b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/MockedRestoreClientInterface.scala @@ -1,14 +1,16 @@ package io.aiven.guardian.kafka.restore -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.scaladsl.Flow -import akka.util.ByteString import io.aiven.guardian.kafka.configs.KafkaCluster import io.aiven.guardian.kafka.restore.configs.Restore +import org.apache.pekko import scala.concurrent.Future +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.scaladsl.Flow +import pekko.util.ByteString + class MockedRestoreClientInterface(backupData: Map[String, ByteString])(implicit override val kafkaProducerInterface: MockedKafkaProducerInterface, override val restoreConfig: Restore, diff --git a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala index 964dfd05..e86689ee 100644 --- a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala +++ b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceSpec.scala @@ -1,8 +1,8 @@ package io.aiven.guardian.kafka.restore -import akka.actor.ActorSystem -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.pekko.AnyPropTestKit +import org.apache.pekko.actor.ActorSystem class RestoreClientInterfaceSpec extends AnyPropTestKit(ActorSystem("RestoreClientInterfaceSpec")) diff --git a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceTest.scala b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceTest.scala index e4e6852f..625e3a50 100644 --- a/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceTest.scala +++ b/core-restore/src/test/scala/io/aiven/guardian/kafka/restore/RestoreClientInterfaceTest.scala @@ -1,10 +1,8 @@ package io.aiven.guardian.kafka.restore -import akka.stream.scaladsl.Source import com.softwaremill.diffx.generic.auto._ import com.softwaremill.diffx.scalatest.DiffMustMatcher._ import com.typesafe.scalalogging.StrictLogging -import io.aiven.guardian.akka.AkkaStreamTestKit import io.aiven.guardian.kafka.ExtensionsMethods._ import io.aiven.guardian.kafka.Generators._ import io.aiven.guardian.kafka.Utils @@ -13,6 +11,8 @@ import io.aiven.guardian.kafka.backup.configs.Compression import io.aiven.guardian.kafka.backup.configs.PeriodFromFirst import io.aiven.guardian.kafka.configs.{KafkaCluster => KafkaClusterConfig} import io.aiven.guardian.kafka.restore.configs.{Restore => RestoreConfig} +import io.aiven.guardian.pekko.PekkoStreamTestKit +import org.apache.pekko import org.scalatest.Inspectors import org.scalatest.concurrent.ScalaFutures import org.scalatest.matchers.must.Matchers @@ -28,9 +28,11 @@ import scala.language.postfixOps import java.time.temporal.ChronoUnit +import pekko.stream.scaladsl.Source + trait RestoreClientInterfaceTest extends AnyPropSpecLike - with AkkaStreamTestKit + with PekkoStreamTestKit with Matchers with ScalaFutures with ScalaCheckPropertyChecks @@ -67,7 +69,7 @@ trait RestoreClientInterfaceTest backupMock.clear() val calculatedFuture = for { _ <- backupMock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + _ <- pekko.pattern.after(PekkoStreamInitializationConstant)(Future.successful(())) processedRecords = backupMock.mergeBackedUpData(compression = compression.map(_.`type`)) restoreMock = new MockedRestoreClientInterface(processedRecords.toMap) keys <- restoreMock.finalKeys @@ -97,7 +99,7 @@ trait RestoreClientInterfaceTest backupMock.clear() val calculatedFuture = for { _ <- backupMock.backup.run() - _ <- akka.pattern.after(10 seconds)(Future.successful(())) + _ <- pekko.pattern.after(10 seconds)(Future.successful(())) processedRecords = backupMock.mergeBackedUpData() restoreMock = new MockedRestoreClientInterface(processedRecords.toMap) (_, runFuture) = restoreMock.restore.run() @@ -134,7 +136,7 @@ trait RestoreClientInterfaceTest backupMock.clear() val calculatedFuture = for { _ <- backupMock.backup.run() - _ <- akka.pattern.after(AkkaStreamInitializationConstant)(Future.successful(())) + _ <- pekko.pattern.after(PekkoStreamInitializationConstant)(Future.successful(())) processedRecords = backupMock.mergeBackedUpData() restoreMock = new MockedRestoreClientInterface(processedRecords.toMap) (_, runFuture) = restoreMock.restore.run() diff --git a/core-s3/src/main/resources/reference.conf b/core-s3/src/main/resources/reference.conf index 95b25db5..1d4296e0 100644 --- a/core-s3/src/main/resources/reference.conf +++ b/core-s3/src/main/resources/reference.conf @@ -1,43 +1,43 @@ -alpakka.s3 { - buffer = ${?ALPAKKA_S3_BUFFER} - disk-buffer-path = ${?ALPAKKA_S3_DISK_BUFFER_PATH} +pekko.connectors.s3 { + buffer = ${?PEKKO_CONNECTORS_S3_BUFFER} + disk-buffer-path = ${?PEKKO_CONNECTORS_S3_DISK_BUFFER_PATH} forward-proxy { - scheme = ${?ALPAKKA_S3_FORWARD_PROXY_SCHEME} - host = ${?ALPAKKA_S3_FORWARD_PROXY_HOST} - port = ${?ALPAKKA_S3_FORWARD_PROXY_PORT} + scheme = ${?PEKKO_CONNECTORS_S3_FORWARD_PROXY_SCHEME} + host = ${?PEKKO_CONNECTORS_S3_FORWARD_PROXY_HOST} + port = ${?PEKKO_CONNECTORS_S3_FORWARD_PROXY_PORT} credentials { - username = ${?ALPAKKA_S3_FORWARD_PROXY_CREDENTIALS_USERNAME} - password = ${?ALPAKKA_S3_FORWARD_PROXY_CREDENTIALS_PASSWORD} + username = ${?PEKKO_CONNECTORS_S3_FORWARD_PROXY_CREDENTIALS_USERNAME} + password = ${?PEKKO_CONNECTORS_S3_FORWARD_PROXY_CREDENTIALS_PASSWORD} } } aws { credentials { - access-key-id = ${?ALPAKKA_S3_AWS_CREDENTIALS_ACCESS_KEY_ID} - secret-access-key = ${?ALPAKKA_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY} - token = ${?ALPAKKA_S3_AWS_CREDENTIALS_TOKEN} - provider = ${?ALPAKKA_S3_AWS_CREDENTIALS_PROVIDER} + access-key-id = ${?PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_ACCESS_KEY_ID} + secret-access-key = ${?PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY} + token = ${?PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_TOKEN} + provider = ${?PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_PROVIDER} } region { - default-region = ${?ALPAKKA_S3_REGION_DEFAULT_REGION} - provider = ${?ALPAKKA_S3_REGION_PROVIDER} + default-region = ${?PEKKO_CONNECTORS_S3_REGION_DEFAULT_REGION} + provider = ${?PEKKO_CONNECTORS_S3_REGION_PROVIDER} } } - path-style-access = ${?ALPAKKA_S3_PATH_STYLE_ACCESS} - access-style = ${?ALPAKKA_S3_ACCESS_STYLE} - endpoint-url = ${?ALPAKKA_S3_ENDPOINT_URL} - list-bucket-api-version = ${?ALPAKKA_S3_LIST_BUCKET_API_VERSION} - validate-object-key = ${?ALPAKKA_S3_VALIDATE_OBJECT_KEY} + path-style-access = ${?PEKKO_CONNECTORS_S3_PATH_STYLE_ACCESS} + access-style = ${?PEKKO_CONNECTORS_S3_ACCESS_STYLE} + endpoint-url = ${?PEKKO_CONNECTORS_S3_ENDPOINT_URL} + list-bucket-api-version = ${?PEKKO_CONNECTORS_S3_LIST_BUCKET_API_VERSION} + validate-object-key = ${?PEKKO_CONNECTORS_S3_VALIDATE_OBJECT_KEY} retry-settings { - max-retries = ${?ALPAKKA_S3_RETRY_SETTINGS_MAX_RETRIES} - min-backoff = ${?ALPAKKA_S3_RETRY_SETTINGS_MIN_BACKOFF} - max-backoff = ${?ALPAKKA_S3_RETRY_SETTINGS_MAX_BACKOFF} - random-factor = ${?ALPAKKA_S3_RETRY_SETTINGS_RANDOM_FACTOR} + max-retries = ${?PEKKO_CONNECTORS_S3_RETRY_SETTINGS_MAX_RETRIES} + min-backoff = ${?PEKKO_CONNECTORS_S3_RETRY_SETTINGS_MIN_BACKOFF} + max-backoff = ${?PEKKO_CONNECTORS_S3_RETRY_SETTINGS_MAX_BACKOFF} + random-factor = ${?PEKKO_CONNECTORS_S3_RETRY_SETTINGS_RANDOM_FACTOR} } } diff --git a/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/Config.scala b/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/Config.scala index 04b3dea2..1c0c5657 100644 --- a/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/Config.scala +++ b/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/Config.scala @@ -1,14 +1,9 @@ package io.aiven.guardian.kafka package s3 -import akka.stream.RestartSettings -import akka.stream.alpakka.s3.MetaHeaders -import akka.stream.alpakka.s3.S3Headers -import akka.stream.alpakka.s3.headers.CannedAcl -import akka.stream.alpakka.s3.headers.ServerSideEncryption -import akka.stream.alpakka.s3.headers.StorageClass import io.aiven.guardian.kafka.PureConfigUtils._ import io.aiven.guardian.kafka.s3.configs.S3 +import org.apache.pekko import pureconfig.ConfigCursor import pureconfig.ConfigReader import pureconfig.ConfigReader._ @@ -18,9 +13,16 @@ import pureconfig.error.UserValidationFailed import scala.annotation.nowarn import scala.concurrent.duration.FiniteDuration +import pekko.stream.RestartSettings +import pekko.stream.connectors.s3.MetaHeaders +import pekko.stream.connectors.s3.S3Headers +import pekko.stream.connectors.s3.headers.CannedAcl +import pekko.stream.connectors.s3.headers.ServerSideEncryption +import pekko.stream.connectors.s3.headers.StorageClass + trait Config { - // TODO Unfortunately the following boilerplate is here because the S3 Alpakka providers no public constructors + // TODO Unfortunately the following boilerplate is here because the S3 Pekko Connectors providers no public constructors // for S3Headers apart from the limited S3Headers(). This means we can't use pureconfig.generic.auto._ and hence // we have to write this out manually diff --git a/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/configs/S3.scala b/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/configs/S3.scala index 5f5880d6..a63a5237 100644 --- a/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/configs/S3.scala +++ b/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/configs/S3.scala @@ -1,6 +1,6 @@ package io.aiven.guardian.kafka.s3.configs -import akka.stream.RestartSettings +import org.apache.pekko.stream.RestartSettings /** S3 specific configuration used when storing Kafka ConsumerRecords to a S3 bucket * @@ -9,7 +9,7 @@ import akka.stream.RestartSettings * @param dataBucketPrefix * Prefix for the data bucket (if any) * @param errorRestartSettings - * Restart settings that are used whenever an akka-stream encounters an error + * Restart settings that are used whenever an pekko-stream encounters an error */ final case class S3(dataBucket: String, dataBucketPrefix: Option[String], errorRestartSettings: RestartSettings) diff --git a/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/errors/S3Errors.scala b/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/errors/S3Errors.scala index 96a7ccbf..66bc8596 100644 --- a/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/errors/S3Errors.scala +++ b/core-s3/src/main/scala/io/aiven/guardian/kafka/s3/errors/S3Errors.scala @@ -1,8 +1,10 @@ package io.aiven.guardian.kafka.s3.errors -import akka.http.scaladsl.model.headers.ByteRange -import akka.stream.alpakka.s3.S3Headers import io.aiven.guardian.kafka.Errors +import org.apache.pekko + +import pekko.http.scaladsl.model.headers.ByteRange +import pekko.stream.connectors.s3.S3Headers sealed abstract class S3Errors extends Errors diff --git a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala index bf562519..995d141a 100644 --- a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala +++ b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Generators.scala @@ -1,7 +1,7 @@ package io.aiven.guardian.kafka.s3 -import akka.stream.RestartSettings import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} +import org.apache.pekko.stream.RestartSettings import org.scalacheck.Gen import scala.annotation.nowarn diff --git a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Main.scala b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Main.scala index 12f31008..4192aec5 100644 --- a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Main.scala +++ b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/Main.scala @@ -1,11 +1,5 @@ package io.aiven.guardian.kafka.s3 -import akka.actor.ActorSystem -import akka.stream.Attributes -import akka.stream.alpakka.s3.S3Attributes -import akka.stream.alpakka.s3.S3Settings -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Sink import cats.data.NonEmptyList import cats.implicits._ import com.monovore.decline.Command @@ -13,13 +7,19 @@ import com.monovore.decline.CommandApp import com.monovore.decline.Opts import com.typesafe.scalalogging.LazyLogging import io.aiven.guardian.kafka.s3.Entry.computeAndDeleteBuckets -import software.amazon.awssdk.regions.Region -import software.amazon.awssdk.regions.providers.AwsRegionProvider +import org.apache.pekko import scala.concurrent._ import scala.concurrent.duration._ import scala.util.control.NonFatal +import pekko.actor.ActorSystem +import pekko.stream.Attributes +import pekko.stream.connectors.s3.S3Attributes +import pekko.stream.connectors.s3.S3Settings +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl.Sink + class Entry extends CommandApp( name = "guardian-s3-test-utils", @@ -80,12 +80,8 @@ object Entry extends LazyLogging { system: ActorSystem, s3Settings: S3Settings ): Future[Set[String]] = - // Bug that needs to be fixed upstream in Alpakka, this specific S3 api call is not region specific - // so US_EAST_1 needs to be hardcoded S3.listBuckets() - .withAttributes(S3Attributes.settings(s3Settings.withS3RegionProvider(new AwsRegionProvider { - val getRegion: Region = Region.US_EAST_1 - }))) + .withAttributes(S3Attributes.settings(s3Settings)) .runWith(Sink.seq) .map { allBuckets => allBuckets.map(_.name).toSet.filter(fromS3Bucket => fromS3Bucket.startsWith(bucketPrefix)).diff(excludeBuckets) diff --git a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/MinioS3Test.scala b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/MinioS3Test.scala index eb253b5d..4cd793a5 100644 --- a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/MinioS3Test.scala +++ b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/MinioS3Test.scala @@ -1,15 +1,17 @@ package io.aiven.guardian.kafka.s3 -import akka.stream.alpakka.s3.AccessStyle -import akka.stream.alpakka.s3.S3Settings -import akka.testkit.TestKitBase import com.dimafeng.testcontainers.ForAllTestContainer +import org.apache.pekko import org.scalatest.Suite import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider import software.amazon.awssdk.regions.Region import software.amazon.awssdk.regions.providers.AwsRegionProvider +import pekko.stream.connectors.s3.AccessStyle +import pekko.stream.connectors.s3.S3Settings +import pekko.testkit.TestKitBase + trait MinioS3Test extends ForAllTestContainer with TestKitBase { this: Suite => private val S3DummyAccessKey = "DUMMY_ACCESS_KEY" private val S3DummySecretKey = "DUMMY_SECRET_KEY" diff --git a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/PureConfigS3HeadersSpec.scala b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/PureConfigS3HeadersSpec.scala index 269144ac..4c33d3bf 100644 --- a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/PureConfigS3HeadersSpec.scala +++ b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/PureConfigS3HeadersSpec.scala @@ -1,11 +1,9 @@ package io.aiven.guardian.kafka.s3 -import akka.stream.alpakka.s3.MetaHeaders -import akka.stream.alpakka.s3.S3Headers -import akka.stream.alpakka.s3.headers._ import com.softwaremill.diffx.generic.auto._ import com.softwaremill.diffx.scalatest.DiffMustMatcher._ import io.aiven.guardian.kafka.s3.Config._ +import org.apache.pekko import org.scalacheck.Arbitrary import org.scalacheck.Gen import org.scalatest.matchers.must.Matchers @@ -14,6 +12,10 @@ import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks import pureconfig.ConfigReader.Result import pureconfig.ConfigSource +import pekko.stream.connectors.s3.MetaHeaders +import pekko.stream.connectors.s3.S3Headers +import pekko.stream.connectors.s3.headers._ + class PureConfigS3HeadersSpec extends AnyPropSpec with Matchers with ScalaCheckPropertyChecks { implicit val cannedAclArb: Arbitrary[CannedAcl] = Arbitrary( Gen.oneOf( diff --git a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/S3Spec.scala b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/S3Spec.scala index 5ce55257..c91f7547 100644 --- a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/S3Spec.scala +++ b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/S3Spec.scala @@ -1,22 +1,12 @@ package io.aiven.guardian.kafka.s3 -import akka.NotUsed -import akka.actor.Scheduler -import akka.stream.Attributes -import akka.stream.alpakka.s3.BucketAccess -import akka.stream.alpakka.s3.ListBucketResultContents -import akka.stream.alpakka.s3.S3Attributes -import akka.stream.alpakka.s3.S3Settings -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import akka.testkit.TestKitBase import com.softwaremill.diffx.ShowConfig import com.typesafe.scalalogging.LazyLogging -import io.aiven.guardian.akka.AkkaHttpTestKit import io.aiven.guardian.kafka.TestUtils import io.aiven.guardian.kafka.models.ReducedConsumerRecord +import io.aiven.guardian.pekko.PekkoHttpTestKit import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.pekko import org.scalactic.Prettifier import org.scalactic.SizeLimit import org.scalatest.Ignore @@ -35,10 +25,22 @@ import scala.language.postfixOps import java.util.Base64 import java.util.concurrent.ConcurrentLinkedQueue +import pekko.NotUsed +import pekko.actor.Scheduler +import pekko.stream.Attributes +import pekko.stream.connectors.s3.BucketAccess +import pekko.stream.connectors.s3.ListBucketResultContents +import pekko.stream.connectors.s3.S3Attributes +import pekko.stream.connectors.s3.S3Settings +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl.Sink +import pekko.stream.scaladsl.Source +import pekko.testkit.TestKitBase + trait S3Spec extends TestKitBase with AnyPropSpecLike - with AkkaHttpTestKit + with PekkoHttpTestKit with ScalaCheckPropertyChecks with ScalaFutures with Config @@ -132,7 +134,7 @@ trait S3Spec try enableCleanup match { case Some(initialDelay) => - Await.result(akka.pattern.after(initialDelay)( + Await.result(pekko.pattern.after(initialDelay)( Future.sequence(bucketsToCleanup.asScala.toList.distinct.map(cleanBucket)) ), maxCleanupTimeout @@ -167,7 +169,7 @@ trait S3Spec transformResult } - akka.pattern.retry(attempt, attempts, delay) + pekko.pattern.retry(attempt, attempts, delay) } case class DownloadNotReady(downloads: Seq[ListBucketResultContents]) diff --git a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/S3TestUtils.scala b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/S3TestUtils.scala index 84e58eb1..01993216 100644 --- a/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/S3TestUtils.scala +++ b/core-s3/src/test/scala/io/aiven/guardian/kafka/s3/S3TestUtils.scala @@ -1,17 +1,19 @@ package io.aiven.guardian.kafka.s3 -import akka.actor.ActorSystem -import akka.stream.Attributes -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Sink import com.typesafe.scalalogging.StrictLogging import markatta.futiles.Retry +import org.apache.pekko import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.postfixOps +import pekko.actor.ActorSystem +import pekko.stream.Attributes +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl.Sink + object S3TestUtils extends StrictLogging { /** Completely cleans a bucket contents as well as deleting it afterwards. diff --git a/core-s3/src/test/scala/akka/stream/alpakka/s3/GeneratorsSpec.scala b/core-s3/src/test/scala/org/apache/pekko/stream/connectors/s3/GeneratorsSpec.scala similarity index 98% rename from core-s3/src/test/scala/akka/stream/alpakka/s3/GeneratorsSpec.scala rename to core-s3/src/test/scala/org/apache/pekko/stream/connectors/s3/GeneratorsSpec.scala index c69cee86..8a31ae0e 100644 --- a/core-s3/src/test/scala/akka/stream/alpakka/s3/GeneratorsSpec.scala +++ b/core-s3/src/test/scala/org/apache/pekko/stream/connectors/s3/GeneratorsSpec.scala @@ -1,4 +1,4 @@ -package akka.stream.alpakka.s3 +package org.apache.pekko.stream.connectors.s3 import com.typesafe.config.Config import com.typesafe.config.ConfigFactory diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index dcf64d1f..c667721f 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -1,7 +1,7 @@ # See https://github.com/akka/akka-http/issues/3201 and https://discuss.lightbend.com/t/about-nomoreelementsneeded-exception/8599 -akka.http.client.stream-cancellation-delay = 1000 millis -akka.http.client.stream-cancellation-delay = ${?AKKA_HTTP_CLIENT_STREAM_CANCELLATION_DELAY} +pekko.http.client.stream-cancellation-delay = 1000 millis +pekko.http.client.stream-cancellation-delay = ${?PEKKO_HTTP_CLIENT_STREAM_CANCELLATION_DELAY} kafka-cluster = { topics = [] diff --git a/core/src/test/resources/application.conf b/core/src/test/resources/application.conf index ac02e6e0..9558a12f 100644 --- a/core/src/test/resources/application.conf +++ b/core/src/test/resources/application.conf @@ -1,4 +1,4 @@ -akka { +pekko { log-dead-letters-during-shutdown = false log-dead-letters = 0 } diff --git a/core/src/test/scala/io/aiven/guardian/kafka/KafkaClusterTest.scala b/core/src/test/scala/io/aiven/guardian/kafka/KafkaClusterTest.scala index 53bb1df4..7ce45626 100644 --- a/core/src/test/scala/io/aiven/guardian/kafka/KafkaClusterTest.scala +++ b/core/src/test/scala/io/aiven/guardian/kafka/KafkaClusterTest.scala @@ -1,20 +1,16 @@ package io.aiven.guardian.kafka -import akka.Done -import akka.kafka.ConsumerSettings -import akka.kafka.ProducerSettings -import akka.kafka.scaladsl.Producer -import akka.stream.scaladsl.Source import com.dimafeng.testcontainers.ForAllTestContainer import com.dimafeng.testcontainers.KafkaContainer -import io.aiven.guardian.akka.AkkaStreamTestKit import io.aiven.guardian.kafka.TestUtils.KafkaFutureToCompletableFuture +import io.aiven.guardian.pekko.PekkoStreamTestKit import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.admin.AdminClient import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.serialization.ByteArraySerializer +import org.apache.pekko import org.scalatest.Suite import scala.concurrent.ExecutionContext @@ -25,11 +21,17 @@ import scala.jdk.CollectionConverters._ import scala.jdk.FutureConverters._ import scala.language.postfixOps -trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this: Suite => +import pekko.Done +import pekko.kafka.ConsumerSettings +import pekko.kafka.ProducerSettings +import pekko.kafka.scaladsl.Producer +import pekko.stream.scaladsl.Source - /** Timeout constant to wait for both Akka Streams plus initialization of consumer/kafka cluster +trait KafkaClusterTest extends ForAllTestContainer with PekkoStreamTestKit { this: Suite => + + /** Timeout constant to wait for both Pekko Streams plus initialization of consumer/kafka cluster */ - val KafkaInitializationTimeoutConstant: FiniteDuration = AkkaStreamInitializationConstant + (2.5 seconds) + val KafkaInitializationTimeoutConstant: FiniteDuration = PekkoStreamInitializationConstant + (2.5 seconds) override lazy val container: KafkaContainer = new KafkaContainer() @@ -72,7 +74,7 @@ trait KafkaClusterTest extends ForAllTestContainer with AkkaStreamTestKit { this def sendTopicAfterTimePeriod(duration: FiniteDuration, producerSettings: ProducerSettings[Array[Byte], Array[Byte]], topic: String - ): Future[Done] = akka.pattern.after(duration) { + ): Future[Done] = pekko.pattern.after(duration) { Source( List( new ProducerRecord[Array[Byte], Array[Byte]](topic, "1".getBytes, "1".getBytes) diff --git a/core/src/test/scala/io/aiven/guardian/kafka/TestUtils.scala b/core/src/test/scala/io/aiven/guardian/kafka/TestUtils.scala index f5e9ab00..5321eb39 100644 --- a/core/src/test/scala/io/aiven/guardian/kafka/TestUtils.scala +++ b/core/src/test/scala/io/aiven/guardian/kafka/TestUtils.scala @@ -1,8 +1,8 @@ package io.aiven.guardian.kafka -import akka.actor.ActorSystem import com.typesafe.scalalogging.LazyLogging import org.apache.kafka.common.KafkaFuture +import org.apache.pekko import scala.collection.immutable import scala.collection.mutable @@ -17,6 +17,8 @@ import java.time.OffsetDateTime import java.time.temporal.ChronoUnit import java.util.concurrent.CompletableFuture +import pekko.actor.ActorSystem + object TestUtils { // Taken from https://stackoverflow.com/a/56763206/1519631 @@ -85,7 +87,7 @@ object TestUtils { if (BigDecimal(current) / BigDecimal(max) * BigDecimal(100) <= buffer) Future.successful(()) else - akka.pattern.after(previousChronoUnit.getDuration.toScala)(recurseUntilHitTimeUnit(previousChronoUnit, buffer)) + pekko.pattern.after(previousChronoUnit.getDuration.toScala)(recurseUntilHitTimeUnit(previousChronoUnit, buffer)) } def waitForStartOfTimeUnit(chronoUnit: ChronoUnit, buffer: BigDecimal = BigDecimal(5))(implicit diff --git a/core/src/test/scala/io/aiven/guardian/akka/AnyPropTestKit.scala b/core/src/test/scala/io/aiven/guardian/pekko/AnyPropTestKit.scala similarity index 58% rename from core/src/test/scala/io/aiven/guardian/akka/AnyPropTestKit.scala rename to core/src/test/scala/io/aiven/guardian/pekko/AnyPropTestKit.scala index f6eaf38c..2c610d26 100644 --- a/core/src/test/scala/io/aiven/guardian/akka/AnyPropTestKit.scala +++ b/core/src/test/scala/io/aiven/guardian/pekko/AnyPropTestKit.scala @@ -1,9 +1,11 @@ -package io.aiven.guardian.akka +package io.aiven.guardian.pekko -import akka.actor.ActorSystem -import akka.testkit.TestKitBase +import org.apache.pekko import org.scalatest.propspec.AnyPropSpec +import pekko.actor.ActorSystem +import pekko.testkit.TestKitBase + class AnyPropTestKit(_system: ActorSystem) extends AnyPropSpec with TestKitBase { implicit val system: ActorSystem = _system } diff --git a/core/src/test/scala/io/aiven/guardian/akka/AkkaHttpTestKit.scala b/core/src/test/scala/io/aiven/guardian/pekko/PekkoHttpTestKit.scala similarity index 56% rename from core/src/test/scala/io/aiven/guardian/akka/AkkaHttpTestKit.scala rename to core/src/test/scala/io/aiven/guardian/pekko/PekkoHttpTestKit.scala index ffbdc8b1..2a887959 100644 --- a/core/src/test/scala/io/aiven/guardian/akka/AkkaHttpTestKit.scala +++ b/core/src/test/scala/io/aiven/guardian/pekko/PekkoHttpTestKit.scala @@ -1,10 +1,12 @@ -package io.aiven.guardian.akka +package io.aiven.guardian.pekko -import akka.actor.ActorSystem -import akka.http.scaladsl.Http +import org.apache.pekko import org.scalatest.Suite -trait AkkaHttpTestKit extends AkkaStreamTestKit { this: Suite => +import pekko.actor.ActorSystem +import pekko.http.scaladsl.Http + +trait PekkoHttpTestKit extends PekkoStreamTestKit { this: Suite => implicit val system: ActorSystem override protected def afterAll(): Unit = diff --git a/core/src/test/scala/io/aiven/guardian/akka/AkkaStreamTestKit.scala b/core/src/test/scala/io/aiven/guardian/pekko/PekkoStreamTestKit.scala similarity index 61% rename from core/src/test/scala/io/aiven/guardian/akka/AkkaStreamTestKit.scala rename to core/src/test/scala/io/aiven/guardian/pekko/PekkoStreamTestKit.scala index 1bcd54ba..469a23b1 100644 --- a/core/src/test/scala/io/aiven/guardian/akka/AkkaStreamTestKit.scala +++ b/core/src/test/scala/io/aiven/guardian/pekko/PekkoStreamTestKit.scala @@ -1,15 +1,17 @@ -package io.aiven.guardian.akka +package io.aiven.guardian.pekko -import akka.actor.ActorSystem -import akka.testkit.TestKit -import akka.testkit.TestKitBase +import org.apache.pekko import org.scalatest.BeforeAndAfterAll import org.scalatest.Suite import scala.concurrent.duration._ import scala.language.postfixOps -trait AkkaStreamTestKit extends TestKitBase with BeforeAndAfterAll { this: Suite => +import pekko.actor.ActorSystem +import pekko.testkit.TestKit +import pekko.testkit.TestKitBase + +trait PekkoStreamTestKit extends TestKitBase with BeforeAndAfterAll { this: Suite => implicit val system: ActorSystem override protected def afterAll(): Unit = @@ -18,5 +20,5 @@ trait AkkaStreamTestKit extends TestKitBase with BeforeAndAfterAll { this: Suite /** If its not possible to determine whether a Stream has finished in a test and instead you need to use a manual * wait, make sure you wait at least this period of time for akka-streams to initialize properly. */ - val AkkaStreamInitializationConstant: FiniteDuration = 1 second + val PekkoStreamInitializationConstant: FiniteDuration = 1 second } diff --git a/docs/src/main/paradox/backup/configuration.md b/docs/src/main/paradox/backup/configuration.md index 3588fe98..6bb8af21 100644 --- a/docs/src/main/paradox/backup/configuration.md +++ b/docs/src/main/paradox/backup/configuration.md @@ -8,8 +8,8 @@ Scala API doc @apidoc[kafka.backup.configs.Backup] ## Explanation -* `akka.kafka.consumer`: See @extref:[documentation](alpakka-kafka-docs:consumer.html#settings) -* `akka.kafka.consumer.kafka-clients`: See @extref:[documentation](kafka-docs:documentation.html#consumerconfigs) +* `pekko.kafka.consumer`: See @extref:[documentation](pekko-connectors-kafka-docs:consumer.html#settings) +* `pekko.kafka.consumer.kafka-clients`: See @extref:[documentation](kafka-docs:documentation.html#consumerconfigs) * `backup`: * `kafka-group-id`: The group id for the Kafka consumer that's used in restore tool * `time-configuration`: How to slice the persisted keys/files based by time diff --git a/docs/src/main/paradox/general-architecture/logging.md b/docs/src/main/paradox/general-architecture/logging.md index bbe8899a..ea313b0a 100644 --- a/docs/src/main/paradox/general-architecture/logging.md +++ b/docs/src/main/paradox/general-architecture/logging.md @@ -13,16 +13,16 @@ from either the @github[cli](/core-cli/src/main/resources/logback.xml) or the @@@ warning -As documented at @extref:[akka logback configuration](akka-docs:logging.html#logback-configuration) it is highly recommended +As documented at @extref:[pekko logback configuration](pekko-docs:logging.html#logback-configuration) it is highly recommended to use an `AsyncAppender` in your configuration as this offsets the logging to a background thread otherwise you will -end up blocking the core akka/akka-streams library whenever a log is made. +end up blocking the core pekko/pekko-streams library whenever a log is made. @@@ -## Logback adapter for akka/akka-streams +## Logback adapter for pekko/pekko-streams -By default, akka/akka-streams uses its own asynchronous logger however they provide a -@extref:[logging adapter](akka-docs:logging.html#slf4j) which has already been preconfigured for use in Guardian. +By default, pekko/pekko-streams uses its own asynchronous logger however they provide a +@extref:[logging adapter](pekko-docs:logging.html#slf4j) which has already been preconfigured for use in Guardian. ## CLI/Application diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md index d26a95e4..4507f788 100644 --- a/docs/src/main/paradox/index.md +++ b/docs/src/main/paradox/index.md @@ -2,7 +2,7 @@ Guardian for Apache Kafka is an open source utility for backing up [Apache Kafka](https://kafka.apache.org/) clusters. It is built using [Scala](https://www.scala-lang.org/) entirely -with [Akka-Streams](https://doc.akka.io/docs/akka/current/stream/index.html) +with [Pekko-Streams](https://pekko.apache.org/docs/pekko/current/stream/index.html) to ensure that the tool runs reliably and as desired with large datasets in different scenarios. @@toc { depth=2 } diff --git a/docs/src/main/paradox/overview.md b/docs/src/main/paradox/overview.md index 1e8170a0..c710412b 100644 --- a/docs/src/main/paradox/overview.md +++ b/docs/src/main/paradox/overview.md @@ -2,15 +2,15 @@ Guardian for Apache Kafka is an open source utility for backing up [Apache Kafka](https://kafka.apache.org/) clusters. It is built using [Scala](https://www.scala-lang.org/) entirely -with [Akka-Streams](https://doc.akka.io/docs/akka/current/stream/index.html) +with [Pekko-Streams](https://pekko.apache.org/docs/pekko/current/stream/index.html) to ensure that the tool runs as desired with large datasets in different scenarios. ## Versions The core modules are compiled against: -* Akka Streams $akka.version$+ (@extref:[Reference](akka-docs:stream/index.html), [Github](https://github.com/akka/akka)) -* Akka Streams Json $akka-streams-json.version$+ ([Github](https://github.com/mdedetrich/akka-streams-json)) +* Pekko Streams $pekko.version$+ (@extref:[Reference](pekko-docs:stream/index.html), [Github](https://github.com/apache/incubator-pekko)) +* Pekko Streams Circe $pekko-stream-circe.version$+ ([Github](https://github.com/mdedetrich/pekko-streams-circe)) * PureConfig $pure-config.version$+ ([Reference](https://pureconfig.github.io/docs/), [Github](https://github.com/pureconfig/pureconfig)) * ScalaLogging $scala-logging.version$+ ([Github](https://github.com/lightbend/scala-logging)) diff --git a/docs/src/main/paradox/persistence/design.md b/docs/src/main/paradox/persistence/design.md index c282b13d..233db86f 100644 --- a/docs/src/main/paradox/persistence/design.md +++ b/docs/src/main/paradox/persistence/design.md @@ -22,5 +22,5 @@ just need to handle how to store/push `ByteString` into the storage of your choi The @apidoc[RestoreClientInterface] implements restoration from an existing backup. Implementing this is quite simple, you need to define @apidoc[RestoreClientInterface.retrieveBackupKeys](RestoreClientInterface) which returns all valid keys to restore (i.e. don't include currently in progress backup keys) and -@apidoc[RestoreClientInterface.downloadFlow](RestoreClientInterface) which is an akka-stream `Flow` that takes +@apidoc[RestoreClientInterface.downloadFlow](RestoreClientInterface) which is a pekko-stream `Flow` that takes a `String` which is the key and outputs the content of that key. diff --git a/docs/src/main/paradox/persistence/s3/configuration.md b/docs/src/main/paradox/persistence/s3/configuration.md index b5b8a57a..a08f26a8 100644 --- a/docs/src/main/paradox/persistence/s3/configuration.md +++ b/docs/src/main/paradox/persistence/s3/configuration.md @@ -8,9 +8,9 @@ Scala API doc @apidoc[kafka.s3.configs.S3] ## Explanation -* `s3-headers`: See @extref:[documentation](alpakka:akka/stream/alpakka/s3/headers/index.html) -* `alpakka.s3`: See @extref:[documentation](alpakka-docs:s3.html#configuration) +* `s3-headers`: See @extref:[documentation](pekko-connectors:org/apache/pekko/stream/connectors/s3/headers/index.html) +* `pekko.connectors.s3`: See @extref:[documentation](pekko-connectors-docs:s3.html#configuration) * `s3-config`: Core S3 configuration * `data-bucket`: The main S3 bucket where data is backed up and where to restore data from * `data-bucket-prefix`: S3 prefix configuration to be used when searching for the bucket - * `error-restart-settings`: Specific retry settings when recovering from known errors in S3. See @extref:[apidoc](akka:akka/stream/RestartSettings.html) + * `error-restart-settings`: Specific retry settings when recovering from known errors in S3. See @extref:[apidoc](pekko:org/apachepekko/stream/RestartSettings.html) diff --git a/docs/src/main/paradox/restore/configuration.md b/docs/src/main/paradox/restore/configuration.md index 7a258654..c3ca37c0 100644 --- a/docs/src/main/paradox/restore/configuration.md +++ b/docs/src/main/paradox/restore/configuration.md @@ -8,8 +8,8 @@ Scala API doc @apidoc[kafka.restore.configs.Restore] ## Explanation -* `akka.kafka.producer`: See @extref:[documentation](alpakka-kafka-docs:producer.html#settings) -* `akka.kafka.producer.kafka-clients`: See @extref:[documentation](kafka-docs:documentation.html#producerconfigs) +* `pekko.kafka.producer`: See @extref:[documentation](pekko-connectors-kafka-docs:producer.html#settings) +* `pekko.kafka.producer.kafka-clients`: See @extref:[documentation](kafka-docs:documentation.html#producerconfigs) * `restore`: * `from-when`: An `ISO-8601` time that specifies from when topics need to be restored. Note that the time used is based on the original Kafka timestamp and **NOT** the current time. diff --git a/docs/src/main/paradox/testing/index.md b/docs/src/main/paradox/testing/index.md index da91202c..f59f67e3 100644 --- a/docs/src/main/paradox/testing/index.md +++ b/docs/src/main/paradox/testing/index.md @@ -16,8 +16,8 @@ using this testing framework are * It provides very handy utilities for testing asynchronous code, for example a @extref:[PatienceConfig](scalatest:concurrent/AbstractPatienceConfiguration$PatienceConfig.html) that provides efficient polling of Scala futures with configurable scalable timeouts and intervals. -* Akka provides @extref:[Testkit](akka-docs:testing.html#asynchronous-testing-testkit) with direct integration into - ScalaTest for easy testing of akka-streams. +* Pekko provides @extref:[Testkit](pekko-docs:testing.html#asynchronous-testing-testkit) with direct integration into + ScalaTest for easy testing of pekko-streams. ### Property based tests diff --git a/docs/src/main/paradox/testing/s3.md b/docs/src/main/paradox/testing/s3.md index 8f2da741..4c05f989 100644 --- a/docs/src/main/paradox/testing/s3.md +++ b/docs/src/main/paradox/testing/s3.md @@ -4,11 +4,11 @@ For tests that run against the [AWS S3 service](https://aws.amazon.com/s3/) you to S3. The most typical way to provide these credentials is with the usage of environment variables, e.g. ```shell -export ALPAKKA_S3_AWS_CREDENTIALS_PROVIDER=static -export ALPAKKA_S3_AWS_CREDENTIALS_ACCESS_KEY_ID="my key" -export ALPAKKA_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY="my secret" -export ALPAKKA_S3_REGION_PROVIDER=static -export ALPAKKA_S3_REGION_DEFAULT_REGION=eu-central-1 +export PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_PROVIDER=static +export PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_ACCESS_KEY_ID="my key" +export PEKKO_CONNECTORS_S3_AWS_CREDENTIALS_SECRET_ACCESS_KEY="my secret" +export PEKKO_CONNECTORS_S3_REGION_PROVIDER=static +export PEKKO_CONNECTORS_S3_REGION_DEFAULT_REGION=eu-central-1 ``` ## Utilities diff --git a/restore-s3/src/main/scala/io/aiven/guardian/kafka/restore/s3/RestoreClient.scala b/restore-s3/src/main/scala/io/aiven/guardian/kafka/restore/s3/RestoreClient.scala index 6d68d89d..e1d74f6b 100644 --- a/restore-s3/src/main/scala/io/aiven/guardian/kafka/restore/s3/RestoreClient.scala +++ b/restore-s3/src/main/scala/io/aiven/guardian/kafka/restore/s3/RestoreClient.scala @@ -1,23 +1,25 @@ package io.aiven.guardian.kafka.restore.s3 -import akka.NotUsed -import akka.actor.ActorSystem -import akka.stream.alpakka.s3.S3Attributes -import akka.stream.alpakka.s3.S3Headers -import akka.stream.alpakka.s3.S3Settings -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.Sink -import akka.util.ByteString import io.aiven.guardian.kafka.configs.KafkaCluster import io.aiven.guardian.kafka.restore.KafkaProducerInterface import io.aiven.guardian.kafka.restore.RestoreClientInterface import io.aiven.guardian.kafka.restore.configs.Restore import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} +import org.apache.pekko import scala.concurrent.ExecutionContext import scala.concurrent.Future +import pekko.NotUsed +import pekko.actor.ActorSystem +import pekko.stream.connectors.s3.S3Attributes +import pekko.stream.connectors.s3.S3Headers +import pekko.stream.connectors.s3.S3Settings +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl.Flow +import pekko.stream.scaladsl.Sink +import pekko.util.ByteString + class RestoreClient[T <: KafkaProducerInterface](maybeS3Settings: Option[S3Settings])(implicit override val kafkaProducerInterface: T, override val restoreConfig: Restore, diff --git a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3GzipCompressionRestoreClientSpec.scala b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3GzipCompressionRestoreClientSpec.scala index dce89412..236e43b0 100644 --- a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3GzipCompressionRestoreClientSpec.scala +++ b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3GzipCompressionRestoreClientSpec.scala @@ -1,9 +1,9 @@ package io.aiven.guardian.kafka.restore.s3 -import akka.actor.ActorSystem -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.backup.configs.Compression import io.aiven.guardian.kafka.models.Gzip +import io.aiven.guardian.pekko.AnyPropTestKit +import org.apache.pekko.actor.ActorSystem class RealS3GzipCompressionRestoreClientSpec extends AnyPropTestKit(ActorSystem("RealS3GzipCompressionRestoreClientSpec")) diff --git a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala index 8a7c7f46..d1adfc5b 100644 --- a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala +++ b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientSpec.scala @@ -1,8 +1,8 @@ package io.aiven.guardian.kafka.restore.s3 -import akka.actor.ActorSystem -import io.aiven.guardian.akka.AnyPropTestKit import io.aiven.guardian.kafka.backup.configs.Compression +import io.aiven.guardian.pekko.AnyPropTestKit +import org.apache.pekko.actor.ActorSystem class RealS3RestoreClientSpec extends AnyPropTestKit(ActorSystem("RealS3RestoreClientSpec")) diff --git a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientTest.scala b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientTest.scala index 59158eb8..25bb3013 100644 --- a/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientTest.scala +++ b/restore-s3/src/test/scala/io/aiven/guardian/kafka/restore/s3/RealS3RestoreClientTest.scala @@ -1,13 +1,5 @@ package io.aiven.guardian.kafka.restore.s3 -import akka.kafka.ConsumerSettings -import akka.kafka.Subscriptions -import akka.kafka.scaladsl.Consumer -import akka.kafka.scaladsl.Consumer.DrainingControl -import akka.kafka.scaladsl.Producer -import akka.stream.alpakka.s3.S3Settings -import akka.stream.alpakka.s3.scaladsl.S3 -import akka.stream.scaladsl.Sink import com.softwaremill.diffx.scalatest.DiffMustMatcher._ import io.aiven.guardian.kafka.Generators._ import io.aiven.guardian.kafka.KafkaClusterTest @@ -25,6 +17,7 @@ import io.aiven.guardian.kafka.s3.Generators.s3ConfigGen import io.aiven.guardian.kafka.s3.S3Spec import io.aiven.guardian.kafka.s3.configs.{S3 => S3Config} import org.apache.kafka.common.serialization.ByteArrayDeserializer +import org.apache.pekko import org.scalatest.matchers.must.Matchers import org.scalatest.propspec.AnyPropSpecLike import org.scalatestplus.scalacheck.ScalaCheckPropertyChecks @@ -34,6 +27,15 @@ import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._ import scala.language.postfixOps +import pekko.kafka.ConsumerSettings +import pekko.kafka.Subscriptions +import pekko.kafka.scaladsl.Consumer +import pekko.kafka.scaladsl.Consumer.DrainingControl +import pekko.kafka.scaladsl.Producer +import pekko.stream.connectors.s3.S3Settings +import pekko.stream.connectors.s3.scaladsl.S3 +import pekko.stream.scaladsl.Sink + trait RealS3RestoreClientTest extends AnyPropSpecLike with S3Spec @@ -107,7 +109,7 @@ trait RealS3RestoreClientTest _ <- createTopics(kafkaDataInChunksWithTimePeriodRenamedTopics.topics) _ <- createBucket(s3Config.dataBucket) _ = backupClientWrapped.run() - _ <- akka.pattern.after(KafkaInitializationTimeoutConstant)( + _ <- pekko.pattern.after(KafkaInitializationTimeoutConstant)( baseSource .runWith(Producer.plainSink(producerSettings)) ) @@ -127,11 +129,11 @@ trait RealS3RestoreClientTest .plainSource(restoreResultConsumerTopicSettings, Subscriptions.topics(renamedTopics)) eventualRestoredTopics = restoreResultConsumerSource.toMat(Sink.collection)(DrainingControl.apply).run() _ <- createTopics(renamedTopics) - _ <- akka.pattern.after(5 seconds) { + _ <- pekko.pattern.after(5 seconds) { val (_, future) = restoreClient.restore.run() future } - receivedTopics <- akka.pattern.after(1 minute)(eventualRestoredTopics.drainAndShutdown()) + receivedTopics <- pekko.pattern.after(1 minute)(eventualRestoredTopics.drainAndShutdown()) asConsumerRecords = receivedTopics.map(KafkaConsumer.consumerRecordToReducedConsumerRecord) } yield asConsumerRecords.toList