Skip to content

Commit

Permalink
Use Akka's setup operator (#1765)
Browse files Browse the repository at this point in the history
* Drop local implementations of setup operator
* Java formatting
* MiMa filters
  • Loading branch information
ennru authored Jun 24, 2019
2 parents c137fcf + df75405 commit 1d9899c
Show file tree
Hide file tree
Showing 38 changed files with 232 additions and 1,336 deletions.
15 changes: 3 additions & 12 deletions amqp/src/test/java/docs/javadsl/AmqpDocsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ public void publishAndConsume() throws Exception {

assertEquals(
input,
result
.toCompletableFuture()
.get(3, TimeUnit.SECONDS)
.stream()
result.toCompletableFuture().get(3, TimeUnit.SECONDS).stream()
.map(m -> m.bytes().utf8String())
.collect(Collectors.toList()));
}
Expand Down Expand Up @@ -278,10 +275,7 @@ public void publishAndConsumeWithoutAutoAck() throws Exception {

assertEquals(
input,
result
.toCompletableFuture()
.get(3, TimeUnit.SECONDS)
.stream()
result.toCompletableFuture().get(3, TimeUnit.SECONDS).stream()
.map(m -> m.bytes().utf8String())
.collect(Collectors.toList()));
}
Expand Down Expand Up @@ -335,10 +329,7 @@ public void republishMessageWithoutAutoAckIfNacked() throws Exception {

assertEquals(
input,
result2
.toCompletableFuture()
.get(10, TimeUnit.SECONDS)
.stream()
result2.toCompletableFuture().get(10, TimeUnit.SECONDS).stream()
.map(m -> m.message().bytes().utf8String())
.collect(Collectors.toList()));

Expand Down
19 changes: 4 additions & 15 deletions cassandra/src/test/java/docs/javadsl/CassandraSourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ public void streamStatementResult() throws Exception {

assertEquals(
IntStream.range(1, 103).boxed().collect(Collectors.toSet()),
rows.toCompletableFuture()
.get(3, TimeUnit.SECONDS)
.stream()
rows.toCompletableFuture().get(3, TimeUnit.SECONDS).stream()
.map(r -> r.getInt("id"))
.collect(Collectors.toSet()));
}
Expand Down Expand Up @@ -149,10 +147,7 @@ public void flowInputValues() throws Exception {

List<Integer> resultToAssert = result.toCompletableFuture().get();
Set<Integer> found =
session
.execute("select * from akka_stream_java_test.test")
.all()
.stream()
session.execute("select * from akka_stream_java_test.test").all().stream()
.map(r -> r.getInt("id"))
.collect(Collectors.toSet());

Expand Down Expand Up @@ -194,10 +189,7 @@ public void flowBatchInputValues() throws Exception {
Set<Integer> resultToAssert =
result.toCompletableFuture().get().stream().map(ti -> ti.cc).collect(Collectors.toSet());
Set<Integer> found =
session
.execute("select * from akka_stream_java_test.test_batch")
.all()
.stream()
session.execute("select * from akka_stream_java_test.test_batch").all().stream()
.map(r -> r.getInt("cc"))
.collect(Collectors.toSet());

Expand Down Expand Up @@ -231,10 +223,7 @@ public void sinkInputValues() throws Exception {
result.toCompletableFuture().get();

Set<Integer> found =
session
.execute("select * from akka_stream_java_test.test")
.all()
.stream()
session.execute("select * from akka_stream_java_test.test").all().stream()
.map(r -> r.getInt("id"))
.collect(Collectors.toSet());
assertEquals(found, IntStream.range(1, 10).boxed().collect(Collectors.toSet()));
Expand Down
2 changes: 2 additions & 0 deletions couchbase/src/main/mima-filters/1.0.x.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Allow changes to impl
ProblemFilters.exclude[Problem]("akka.stream.alpakka.couchbase.impl.*")

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package akka.stream.alpakka.couchbase.scaladsl
import akka.NotUsed
import akka.stream.alpakka.couchbase.impl.Setup
import akka.stream.alpakka.couchbase._
import akka.stream.scaladsl.Flow
import com.couchbase.client.java.document.{Document, JsonDocument}
Expand All @@ -18,8 +17,8 @@ object CouchbaseFlow {
* Create a flow to query Couchbase for by `id` and emit [[com.couchbase.client.java.document.JsonDocument JsonDocument]]s.
*/
def fromId(sessionSettings: CouchbaseSessionSettings, bucketName: String): Flow[String, JsonDocument, NotUsed] =
Setup
.flow { materializer => _ =>
Flow
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */ ))(materializer.system.dispatcher))
Expand All @@ -33,8 +32,8 @@ object CouchbaseFlow {
def fromId[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
bucketName: String,
target: Class[T]): Flow[String, T, NotUsed] =
Setup
.flow { materializer => _ =>
Flow
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */, target))(materializer.system.dispatcher))
Expand All @@ -48,8 +47,8 @@ object CouchbaseFlow {
def upsert(sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[JsonDocument, JsonDocument, NotUsed] =
Setup
.flow { materializer => _ =>
Flow
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[JsonDocument]
.mapAsync(writeSettings.parallelism)(
Expand All @@ -64,8 +63,8 @@ object CouchbaseFlow {
def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, T, NotUsed] =
Setup
.flow { materializer => _ =>
Flow
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
.mapAsync(writeSettings.parallelism)(
Expand All @@ -81,8 +80,8 @@ object CouchbaseFlow {
def upsertDocWithResult[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
Setup
.flow { materializer => _ =>
Flow
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
.mapAsync(writeSettings.parallelism)(
Expand All @@ -105,8 +104,8 @@ object CouchbaseFlow {
def delete(sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[String, String, NotUsed] =
Setup
.flow { materializer => _ =>
Flow
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(writeSettings.parallelism)(
Expand All @@ -126,8 +125,8 @@ object CouchbaseFlow {
def deleteWithResult(sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[String, CouchbaseDeleteResult, NotUsed] =
Setup
.flow { materializer => _ =>
Flow
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(writeSettings.parallelism)(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.stream.alpakka.couchbase.scaladsl

import akka.NotUsed
import akka.stream.alpakka.couchbase.impl.Setup
import akka.stream.alpakka.couchbase.{CouchbaseSessionRegistry, CouchbaseSessionSettings}
import akka.stream.scaladsl.Source
import com.couchbase.client.java.document.json.JsonObject
Expand All @@ -22,8 +21,8 @@ object CouchbaseSource {
def fromStatement(sessionSettings: CouchbaseSessionSettings,
statement: Statement,
bucketName: String): Source[JsonObject, NotUsed] =
Setup
.source { materializer => _ =>
Source
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Source
.fromFuture(session.map(_.streamedQuery(statement))(materializer.system.dispatcher))
Expand All @@ -37,8 +36,8 @@ object CouchbaseSource {
def fromN1qlQuery(sessionSettings: CouchbaseSessionSettings,
query: N1qlQuery,
bucketName: String): Source[JsonObject, NotUsed] =
Setup
.source { materializer => _ =>
Source
.setup { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Source
.fromFuture(session.map(_.streamedQuery(query))(materializer.system.dispatcher))
Expand Down
2 changes: 2 additions & 0 deletions dynamodb/src/main/mima-filters/1.0.x.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Allow changes to impl
ProblemFilters.exclude[Problem]("akka.stream.alpakka.dynamodb.impl.*")
Loading

0 comments on commit 1d9899c

Please sign in to comment.