From 9f74250c601648ec3a9769de06eb7f0421442261 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Mon, 29 Jul 2024 23:16:42 +0200 Subject: [PATCH] Don't use Scala Futures in Java APIs Sketching out #1417 - incomplete and notably not bothering with binary compatibility yet, just to illustrate the idea. --- .../pekko/dispatch/JavaFutureTests.java | 1 + .../apache/pekko/pattern/PatternsTest.java | 56 +++++++++---------- .../org/apache/pekko/dispatch/Future.scala | 7 +++ .../java/jdocs/future/ActorWithFuture.java | 5 +- .../test/java/jdocs/future/FutureDocTest.java | 42 +++++++------- .../LambdaPersistencePluginDocTest.java | 24 ++++---- .../test/java/jdocs/stream/FlowDocTest.java | 5 +- .../cookbook/RecipeAdhocSourceTest.java | 29 +++++----- .../journal/japi/AsyncWritePlugin.java | 8 +-- .../snapshot/japi/SnapshotStorePlugin.java | 9 +-- .../journal/japi/AsyncWriteJournal.scala | 8 ++- .../snapshot/japi/SnapshotStore.scala | 9 +-- 12 files changed, 107 insertions(+), 96 deletions(-) diff --git a/actor-tests/src/test/java/org/apache/pekko/dispatch/JavaFutureTests.java b/actor-tests/src/test/java/org/apache/pekko/dispatch/JavaFutureTests.java index 6af8c2b399c..d5f9b193b4c 100644 --- a/actor-tests/src/test/java/org/apache/pekko/dispatch/JavaFutureTests.java +++ b/actor-tests/src/test/java/org/apache/pekko/dispatch/JavaFutureTests.java @@ -36,6 +36,7 @@ import org.apache.pekko.testkit.PekkoSpec; +@SuppressWarnings("deprecation") public class JavaFutureTests extends JUnitSuite { @ClassRule diff --git a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java index aa75a4f3a44..937da369674 100644 --- a/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java +++ b/actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java @@ -19,6 +19,7 @@ import org.apache.pekko.testkit.PekkoSpec; import org.apache.pekko.testkit.TestProbe; import org.apache.pekko.util.Timeout; +import org.apache.pekko.util.FutureConverters; import org.junit.ClassRule; import org.junit.Test; import org.scalatestplus.junit.JUnitSuite; @@ -219,7 +220,7 @@ public void testAskWithReplyToTimeout() throws Exception { @Test public void usePipe() throws Exception { TestProbe probe = new TestProbe(system); - pipe(Futures.successful("ho!"), system.dispatcher()).to(probe.ref()); + pipe(CompletableFuture.completedFuture("ho!"), system.dispatcher()).to(probe.ref()); probe.expectMsg("ho!"); } @@ -227,7 +228,7 @@ public void usePipe() throws Exception { public void usePipeWithActorSelection() throws Exception { TestProbe probe = new TestProbe(system); ActorSelection selection = system.actorSelection(probe.ref().path()); - pipe(Futures.successful("hi!"), system.dispatcher()).to(selection); + pipe(CompletableFuture.completedFuture("hi!"), system.dispatcher()).to(selection); probe.expectMsg("hi!"); } @@ -291,15 +292,11 @@ public void testRetryCompletionStageRandomDelay() throws Exception { public void testRetry() throws Exception { final String expected = "hello"; - Future retriedFuture = + CompletionStage retriedFuture = Patterns.retry( - () -> Futures.successful(expected), - 3, - scala.concurrent.duration.Duration.apply(200, "millis"), - system.scheduler(), - ec); + () -> CompletableFuture.completedFuture(expected), 3, Duration.ofMillis(200), system); - String actual = Await.result(retriedFuture, FiniteDuration.apply(3, SECONDS)); + String actual = retriedFuture.toCompletableFuture().get(3, SECONDS); assertEquals(expected, actual); } @@ -317,21 +314,21 @@ public void testCSRetry() throws Exception { } @Test(expected = IllegalStateException.class) - public void testAfterFailedCallable() throws Exception { - Callable> failedCallable = - () -> Futures.failed(new IllegalStateException("Illegal!")); + public void testAfterFailedCallable() throws Throwable { + Callable> failedCallable = + () -> Futures.failedCompletionStage(new IllegalStateException("Illegal!")); - Future delayedFuture = - Patterns.after( - scala.concurrent.duration.Duration.create(200, "millis"), - system.scheduler(), - ec, - failedCallable); + CompletionStage delayedFuture = + Patterns.after(Duration.ofMillis(200), system, failedCallable); - Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec); - Await.result(resultFuture, scala.concurrent.duration.FiniteDuration.apply(3, SECONDS)); + try { + delayedFuture.toCompletableFuture().get(3, SECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } } + @SuppressWarnings("deprecation") @Test(expected = IllegalStateException.class) public void testAfterFailedFuture() throws Exception { @@ -340,7 +337,9 @@ public void testAfterFailedFuture() throws Exception { scala.concurrent.duration.Duration.create(200, "millis"), system.scheduler(), ec, - () -> Futures.failed(new IllegalStateException("Illegal!"))); + () -> + FutureConverters.asScala( + Futures.failedCompletionStage(new IllegalStateException("Illegal!")))); Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec); Await.result(resultFuture, FiniteDuration.apply(3, SECONDS)); @@ -350,19 +349,16 @@ public void testAfterFailedFuture() throws Exception { public void testAfterSuccessfulCallable() throws Exception { final String expected = "Hello"; - Future delayedFuture = + CompletionStage delayedFuture = Patterns.after( - scala.concurrent.duration.Duration.create(200, "millis"), - system.scheduler(), - ec, - () -> Futures.successful(expected)); + Duration.ofMillis(200), system, () -> CompletableFuture.completedFuture(expected)); - Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec); - final String actual = Await.result(resultFuture, FiniteDuration.apply(3, SECONDS)); + String actual = delayedFuture.toCompletableFuture().get(3, SECONDS); assertEquals(expected, actual); } + @SuppressWarnings("deprecation") @Test public void testAfterSuccessfulFuture() throws Exception { final String expected = "Hello"; @@ -380,6 +376,7 @@ public void testAfterSuccessfulFuture() throws Exception { assertEquals(expected, actual); } + @SuppressWarnings("deprecation") @Test public void testAfterFiniteDuration() throws Exception { final String expected = "Hello"; @@ -391,7 +388,8 @@ public void testAfterFiniteDuration() throws Exception { ec, () -> Futures.successful("world")); - Future immediateFuture = Futures.future(() -> expected, ec); + Future immediateFuture = + FutureConverters.asScala(CompletableFuture.completedFuture(expected)); Future resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture, immediateFuture), ec); diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala index 3099bcf6f93..a458cf9c846 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala @@ -130,6 +130,7 @@ object Futures { * @param executor the execution context on which the future is run * @return the `Future` holding the result of the computation */ + @deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0") def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor) /** @@ -137,20 +138,25 @@ object Futures { * * @return the newly created `Promise` object */ + @deprecated("Use CompletableFuture instead", "1.1.0") def promise[T](): Promise[T] = Promise[T]() /** * creates an already completed Promise with the specified exception */ + @deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0") def failed[T](exception: Throwable): Future[T] = Future.failed(exception) /** * Creates an already completed Promise with the specified result */ + @deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0") def successful[T](result: T): Future[T] = Future.successful(result) /** * Creates an already completed CompletionStage with the specified exception + * + * Note: prefer CompletableFuture.failedStage(ex) from Java 9 onwards */ def failedCompletionStage[T](ex: Throwable): CompletionStage[T] = { val f = CompletableFuture.completedFuture[T](null.asInstanceOf[T]) @@ -172,6 +178,7 @@ object Futures { /** * Returns a Future to the result of the first future in the list that is completed */ + @deprecated("Use CompletableFuture.anyOf instead", "1.1.0") def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] = Future.firstCompletedOf(futures.asScala)(executor) diff --git a/docs/src/test/java/jdocs/future/ActorWithFuture.java b/docs/src/test/java/jdocs/future/ActorWithFuture.java index 9600b44011b..19553f1a1ce 100644 --- a/docs/src/test/java/jdocs/future/ActorWithFuture.java +++ b/docs/src/test/java/jdocs/future/ActorWithFuture.java @@ -15,11 +15,12 @@ // #context-dispatcher import org.apache.pekko.actor.AbstractActor; -import org.apache.pekko.dispatch.Futures; + +import java.util.concurrent.CompletableFuture; public class ActorWithFuture extends AbstractActor { ActorWithFuture() { - Futures.future(() -> "hello", getContext().dispatcher()); + CompletableFuture.supplyAsync(() -> "hello", getContext().dispatcher()); } @Override diff --git a/docs/src/test/java/jdocs/future/FutureDocTest.java b/docs/src/test/java/jdocs/future/FutureDocTest.java index 5627b1d4e12..98e5fe7cf60 100644 --- a/docs/src/test/java/jdocs/future/FutureDocTest.java +++ b/docs/src/test/java/jdocs/future/FutureDocTest.java @@ -14,27 +14,18 @@ package jdocs.future; import org.apache.pekko.actor.typed.ActorSystem; -import org.apache.pekko.dispatch.Futures; import org.apache.pekko.pattern.Patterns; import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; import org.apache.pekko.testkit.PekkoSpec; import org.apache.pekko.util.Timeout; -import org.apache.pekko.util.FutureConverters; import jdocs.AbstractJavaTest; import org.junit.ClassRule; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; import java.time.Duration; -import java.util.Arrays; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; +import java.util.concurrent.*; import static org.apache.pekko.actor.typed.javadsl.Adapter.toTyped; -import static org.apache.pekko.dispatch.Futures.future; // #imports // #imports @@ -48,9 +39,9 @@ public class FutureDocTest extends AbstractJavaTest { private final ActorSystem system = toTyped(actorSystemResource.getSystem()); - @Test(expected = java.util.concurrent.CompletionException.class) - public void useAfter() throws Exception { - final ExecutionContext ec = system.executionContext(); + @Test(expected = IllegalStateException.class) + public void useAfter() throws Throwable { + final Executor ex = system.executionContext(); // #after CompletionStage failWithException = CompletableFuture.supplyAsync( @@ -60,18 +51,25 @@ public void useAfter() throws Exception { CompletionStage delayed = Patterns.after(Duration.ofMillis(200), system, () -> failWithException); // #after - Future future = - future( + CompletionStage completionStage = + CompletableFuture.supplyAsync( () -> { - Thread.sleep(1000); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } return "foo"; }, - ec); - Future result = - Futures.firstCompletedOf( - Arrays.>asList(future, FutureConverters.asScala(delayed)), ec); - Timeout timeout = Timeout.create(Duration.ofSeconds(2)); - Await.result(result, timeout.duration()); + ex); + CompletableFuture result = + CompletableFuture.anyOf( + completionStage.toCompletableFuture(), delayed.toCompletableFuture()); + try { + result.toCompletableFuture().get(2, SECONDS); + } catch (ExecutionException e) { + throw e.getCause(); + } } @Test diff --git a/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java b/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java index b951163911a..03e86cda481 100644 --- a/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java +++ b/docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java @@ -33,6 +33,9 @@ import org.junit.runner.RunWith; import org.scalatestplus.junit.JUnitRunner; import scala.concurrent.Future; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import org.iq80.leveldb.util.FileUtils; import java.util.Optional; @@ -81,45 +84,46 @@ public Receive createReceive() { class MySnapshotStore extends SnapshotStore { @Override - public Future> doLoadAsync( + public CompletionStage> doLoadAsync( String persistenceId, SnapshotSelectionCriteria criteria) { return null; } @Override - public Future doSaveAsync(SnapshotMetadata metadata, Object snapshot) { + public CompletionStage doSaveAsync(SnapshotMetadata metadata, Object snapshot) { return null; } @Override - public Future doDeleteAsync(SnapshotMetadata metadata) { - return Futures.successful(null); + public CompletionStage doDeleteAsync(SnapshotMetadata metadata) { + return CompletableFuture.completedFuture(null); } @Override - public Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) { - return Futures.successful(null); + public CompletionStage doDeleteAsync( + String persistenceId, SnapshotSelectionCriteria criteria) { + return CompletableFuture.completedFuture(null); } } class MyAsyncJournal extends AsyncWriteJournal { // #sync-journal-plugin-api @Override - public Future>> doAsyncWriteMessages( + public CompletionStage>> doAsyncWriteMessages( Iterable messages) { try { Iterable> result = new ArrayList>(); // blocking call here... // result.add(..) - return Futures.successful(result); + return CompletableFuture.completedFuture(result); } catch (Exception e) { - return Futures.failed(e); + return Futures.failedCompletionStage(e); } } // #sync-journal-plugin-api @Override - public Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { + public CompletionStage doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) { return null; } diff --git a/docs/src/test/java/jdocs/stream/FlowDocTest.java b/docs/src/test/java/jdocs/stream/FlowDocTest.java index 82f04a08bab..56cbbf28b9d 100644 --- a/docs/src/test/java/jdocs/stream/FlowDocTest.java +++ b/docs/src/test/java/jdocs/stream/FlowDocTest.java @@ -19,7 +19,6 @@ import org.apache.pekko.actor.ActorRef; import org.apache.pekko.actor.ActorSystem; import org.apache.pekko.actor.Cancellable; -import org.apache.pekko.dispatch.Futures; import org.apache.pekko.japi.Pair; import org.apache.pekko.stream.*; import org.apache.pekko.stream.javadsl.*; @@ -158,8 +157,8 @@ public void creatingSourcesSinks() throws Exception { list.add(3); Source.from(list); - // Create a source form a Future - Source.future(Futures.successful("Hello Streams!")); + // Create a source form a CompletionStage + Source.completionStage(CompletableFuture.completedFuture("Hello Streams!")); // Create a source from a single element Source.single("only one element"); diff --git a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java index d0cc6f1c2e6..267d340cf82 100644 --- a/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java +++ b/docs/src/test/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java @@ -16,7 +16,6 @@ import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.ActorSystem; -import org.apache.pekko.dispatch.Futures; import org.apache.pekko.japi.pf.PFBuilder; import org.apache.pekko.stream.BackpressureTimeoutException; import org.apache.pekko.stream.javadsl.Keep; @@ -28,10 +27,10 @@ import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import scala.concurrent.Await; -import scala.concurrent.Promise; import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -111,12 +110,12 @@ public void startStream() throws Exception { public void shutdownStream() throws Exception { new TestKit(system) { { - Promise shutdown = Futures.promise(); + CompletableFuture shutdown = new CompletableFuture(); TestSubscriber.Probe probe = adhocSource( Source.repeat("a") .watchTermination( - (a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), + (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) .toMat(TestSink.probe(system), Keep.right()) @@ -124,7 +123,7 @@ public void shutdownStream() throws Exception { probe.requestNext("a"); Thread.sleep(300); - Await.result(shutdown.future(), duration("3 seconds")); + shutdown.get(3, TimeUnit.SECONDS); } }; } @@ -134,12 +133,12 @@ public void shutdownStream() throws Exception { public void notShutDownStream() throws Exception { new TestKit(system) { { - Promise shutdown = Futures.promise(); + CompletableFuture shutdown = new CompletableFuture(); TestSubscriber.Probe probe = adhocSource( Source.repeat("a") .watchTermination( - (a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), + (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) .toMat(TestSink.probe(system), Keep.right()) @@ -156,7 +155,7 @@ public void notShutDownStream() throws Exception { probe.requestNext("a"); Thread.sleep(100); - assertEquals(false, shutdown.isCompleted()); + assertEquals(false, shutdown.isDone()); } }; } @@ -166,7 +165,7 @@ public void notShutDownStream() throws Exception { public void restartUponDemand() throws Exception { new TestKit(system) { { - Promise shutdown = Futures.promise(); + CompletableFuture shutdown = new CompletableFuture(); AtomicInteger startedCount = new AtomicInteger(0); Source source = @@ -177,7 +176,7 @@ public void restartUponDemand() throws Exception { TestSubscriber.Probe probe = adhocSource( source.watchTermination( - (a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), + (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) .toMat(TestSink.probe(system), Keep.right()) @@ -186,7 +185,7 @@ public void restartUponDemand() throws Exception { probe.requestNext("a"); assertEquals(1, startedCount.get()); Thread.sleep(200); - Await.result(shutdown.future(), duration("3 seconds")); + shutdown.get(3, TimeUnit.SECONDS); } }; } @@ -196,7 +195,7 @@ public void restartUponDemand() throws Exception { public void restartUptoMaxRetries() throws Exception { new TestKit(system) { { - Promise shutdown = Futures.promise(); + CompletableFuture shutdown = new CompletableFuture<>(); AtomicInteger startedCount = new AtomicInteger(0); Source source = @@ -207,7 +206,7 @@ public void restartUptoMaxRetries() throws Exception { TestSubscriber.Probe probe = adhocSource( source.watchTermination( - (a, term) -> term.thenRun(() -> shutdown.success(Done.getInstance()))), + (a, term) -> term.thenRun(() -> shutdown.complete(Done.getInstance()))), duration200mills, 3) .toMat(TestSink.probe(system), Keep.right()) @@ -217,7 +216,7 @@ public void restartUptoMaxRetries() throws Exception { assertEquals(1, startedCount.get()); Thread.sleep(500); - assertEquals(true, shutdown.isCompleted()); + assertEquals(true, shutdown.isDone()); Thread.sleep(500); probe.requestNext("a"); diff --git a/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java b/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java index 3eba7b5cde0..3173585c840 100644 --- a/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java +++ b/persistence/src/main/java/org/apache/pekko/persistence/journal/japi/AsyncWritePlugin.java @@ -14,8 +14,7 @@ package org.apache.pekko.persistence.journal.japi; import java.util.Optional; - -import scala.concurrent.Future; +import java.util.concurrent.CompletionStage; import org.apache.pekko.persistence.*; @@ -73,7 +72,8 @@ interface AsyncWritePlugin { * *

This call is protected with a circuit-breaker. */ - Future>> doAsyncWriteMessages(Iterable messages); + CompletionStage>> doAsyncWriteMessages( + Iterable messages); /** * Java API, Plugin API: synchronously deletes all persistent messages up to `toSequenceNr`. @@ -82,6 +82,6 @@ interface AsyncWritePlugin { * * @see AsyncRecoveryPlugin */ - Future doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr); + CompletionStage doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr); // #async-write-plugin-api } diff --git a/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java b/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java index 20313bb3184..047ce99bbfd 100644 --- a/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java +++ b/persistence/src/main/java/org/apache/pekko/persistence/snapshot/japi/SnapshotStorePlugin.java @@ -19,6 +19,7 @@ import scala.concurrent.Future; import java.util.Optional; +import java.util.concurrent.CompletionStage; interface SnapshotStorePlugin { // #snapshot-store-plugin-api @@ -28,7 +29,7 @@ interface SnapshotStorePlugin { * @param persistenceId id of the persistent actor. * @param criteria selection criteria for loading. */ - Future> doLoadAsync( + CompletionStage> doLoadAsync( String persistenceId, SnapshotSelectionCriteria criteria); /** @@ -37,14 +38,14 @@ Future> doLoadAsync( * @param metadata snapshot metadata. * @param snapshot snapshot. */ - Future doSaveAsync(SnapshotMetadata metadata, Object snapshot); + CompletionStage doSaveAsync(SnapshotMetadata metadata, Object snapshot); /** * Java API, Plugin API: deletes the snapshot identified by `metadata`. * * @param metadata snapshot metadata. */ - Future doDeleteAsync(SnapshotMetadata metadata); + CompletionStage doDeleteAsync(SnapshotMetadata metadata); /** * Java API, Plugin API: deletes all snapshots matching `criteria`. @@ -52,6 +53,6 @@ Future> doLoadAsync( * @param persistenceId id of the persistent actor. * @param criteria selection criteria for deleting. */ - Future doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria); + CompletionStage doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria); // #snapshot-store-plugin-api } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala b/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala index ef74c5302a7..7fbb6e76059 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala @@ -21,6 +21,7 @@ import scala.util.Try import org.apache.pekko import pekko.persistence._ import pekko.persistence.journal.{ AsyncWriteJournal => SAsyncWriteJournal } +import pekko.util.FutureConverters._ import pekko.util.ccompat._ import pekko.util.ccompat.JavaConverters._ @@ -33,7 +34,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w import context.dispatcher final def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): Future[immutable.Seq[Try[Unit]]] = - doAsyncWriteMessages(messages.asJava).map { results => + doAsyncWriteMessages(messages.asJava).asScala.map { results => results.asScala.iterator .map { r => if (r.isPresent) Failure(r.get) @@ -42,6 +43,7 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w .to(immutable.IndexedSeq) } - final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long) = - doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).map(_ => ()) + final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = { + doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).asScala.map(_ => ()) + } } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala b/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala index ca7f1e9d9fa..fd35c6af74b 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala @@ -16,6 +16,7 @@ package org.apache.pekko.persistence.snapshot.japi import scala.concurrent.Future import org.apache.pekko +import pekko.util.FutureConverters._ import pekko.japi.Util._ import pekko.persistence._ import pekko.persistence.snapshot.{ SnapshotStore => SSnapshotStore } @@ -29,15 +30,15 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { override final def loadAsync( persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = - doLoadAsync(persistenceId, criteria).map(option) + doLoadAsync(persistenceId, criteria).asScala.map(option) override final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = - doSaveAsync(metadata, snapshot).map(_ => ()) + doSaveAsync(metadata, snapshot).asScala.map(_ => ()) override final def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = - doDeleteAsync(metadata).map(_ => ()) + doDeleteAsync(metadata).asScala.map(_ => ()) override final def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = - doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).map(_ => ()) + doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).asScala.map(_ => ()) }