Skip to content

Commit

Permalink
Drop local implementations of setup operator
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru committed Jun 19, 2019
1 parent 7963773 commit e925403
Show file tree
Hide file tree
Showing 18 changed files with 473 additions and 1,485 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package akka.stream.alpakka.couchbase.scaladsl
import akka.NotUsed
import akka.stream.alpakka.couchbase.impl.Setup
import akka.stream.alpakka.couchbase._
import akka.stream.scaladsl.Flow
import com.couchbase.client.java.document.{Document, JsonDocument}
Expand All @@ -18,12 +17,13 @@ object CouchbaseFlow {
* Create a flow to query Couchbase for by `id` and emit [[com.couchbase.client.java.document.JsonDocument JsonDocument]]s.
*/
def fromId(sessionSettings: CouchbaseSessionSettings, bucketName: String): Flow[String, JsonDocument, NotUsed] =
Setup
.flow { materializer => _ =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */ ))(materializer.system.dispatcher))
.collect { case Some(doc) => doc }
Flow
.setup {
case (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */ ))(materializer.system.dispatcher))
.collect { case Some(doc) => doc }
}
.mapMaterializedValue(_ => NotUsed)

Expand All @@ -33,12 +33,13 @@ object CouchbaseFlow {
def fromId[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
bucketName: String,
target: Class[T]): Flow[String, T, NotUsed] =
Setup
.flow { materializer => _ =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */, target))(materializer.system.dispatcher))
.collect { case Some(doc) => doc }
Flow
.setup {
case (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(1)(id => session.flatMap(_.get(id /* timeout? */, target))(materializer.system.dispatcher))
.collect { case Some(doc) => doc }
}
.mapMaterializedValue(_ => NotUsed)

Expand All @@ -48,13 +49,14 @@ object CouchbaseFlow {
def upsert(sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[JsonDocument, JsonDocument, NotUsed] =
Setup
.flow { materializer => _ =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[JsonDocument]
.mapAsync(writeSettings.parallelism)(
doc => session.flatMap(_.upsert(doc, writeSettings))(materializer.system.dispatcher)
)
Flow
.setup {
case (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[JsonDocument]
.mapAsync(writeSettings.parallelism)(
doc => session.flatMap(_.upsert(doc, writeSettings))(materializer.system.dispatcher)
)
}
.mapMaterializedValue(_ => NotUsed)

Expand All @@ -64,13 +66,14 @@ object CouchbaseFlow {
def upsertDoc[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, T, NotUsed] =
Setup
.flow { materializer => _ =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
.mapAsync(writeSettings.parallelism)(
doc => session.flatMap(_.upsertDoc(doc, writeSettings))(materializer.system.dispatcher)
)
Flow
.setup {
case (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
.mapAsync(writeSettings.parallelism)(
doc => session.flatMap(_.upsertDoc(doc, writeSettings))(materializer.system.dispatcher)
)
}
.mapMaterializedValue(_ => NotUsed)

Expand All @@ -81,21 +84,22 @@ object CouchbaseFlow {
def upsertDocWithResult[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
Setup
.flow { materializer => _ =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
.mapAsync(writeSettings.parallelism)(
doc => {
implicit val executor = materializer.system.dispatcher
session
.flatMap(_.upsertDoc(doc, writeSettings))
.map(_ => CouchbaseWriteSuccess(doc))
.recover {
case exception => CouchbaseWriteFailure(doc, exception)
}
}
)
Flow
.setup {
case (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
.mapAsync(writeSettings.parallelism)(
doc => {
implicit val executor = materializer.system.dispatcher
session
.flatMap(_.upsertDoc(doc, writeSettings))
.map(_ => CouchbaseWriteSuccess(doc))
.recover {
case exception => CouchbaseWriteFailure(doc, exception)
}
}
)
}
.mapMaterializedValue(_ => NotUsed)

Expand All @@ -105,18 +109,19 @@ object CouchbaseFlow {
def delete(sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[String, String, NotUsed] =
Setup
.flow { materializer => _ =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(writeSettings.parallelism)(
id => {
implicit val executor = materializer.system.dispatcher
session
.flatMap(_.remove(id, writeSettings))
.map(_ => id)
}
)
Flow
.setup {
case (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(writeSettings.parallelism)(
id => {
implicit val executor = materializer.system.dispatcher
session
.flatMap(_.remove(id, writeSettings))
.map(_ => id)
}
)
}
.mapMaterializedValue(_ => NotUsed)

Expand All @@ -126,21 +131,22 @@ object CouchbaseFlow {
def deleteWithResult(sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[String, CouchbaseDeleteResult, NotUsed] =
Setup
.flow { materializer => _ =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(writeSettings.parallelism)(
id => {
implicit val executor = materializer.system.dispatcher
session
.flatMap(_.remove(id, writeSettings))
.map(_ => CouchbaseDeleteSuccess(id))
.recover {
case exception => CouchbaseDeleteFailure(id, exception)
}
}
)
Flow
.setup {
case (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[String]
.mapAsync(writeSettings.parallelism)(
id => {
implicit val executor = materializer.system.dispatcher
session
.flatMap(_.remove(id, writeSettings))
.map(_ => CouchbaseDeleteSuccess(id))
.recover {
case exception => CouchbaseDeleteFailure(id, exception)
}
}
)
}
.mapMaterializedValue(_ => NotUsed)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package akka.stream.alpakka.couchbase.scaladsl

import akka.NotUsed
import akka.stream.alpakka.couchbase.impl.Setup
import akka.stream.alpakka.couchbase.{CouchbaseSessionRegistry, CouchbaseSessionSettings}
import akka.stream.scaladsl.Source
import com.couchbase.client.java.document.json.JsonObject
Expand All @@ -22,12 +21,13 @@ object CouchbaseSource {
def fromStatement(sessionSettings: CouchbaseSessionSettings,
statement: Statement,
bucketName: String): Source[JsonObject, NotUsed] =
Setup
.source { materializer => _ =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Source
.fromFuture(session.map(_.streamedQuery(statement))(materializer.system.dispatcher))
.flatMapConcat(identity)
Source
.setup {
case (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Source
.fromFuture(session.map(_.streamedQuery(statement))(materializer.system.dispatcher))
.flatMapConcat(identity)
}
.mapMaterializedValue(_ => NotUsed)

Expand All @@ -37,12 +37,13 @@ object CouchbaseSource {
def fromN1qlQuery(sessionSettings: CouchbaseSessionSettings,
query: N1qlQuery,
bucketName: String): Source[JsonObject, NotUsed] =
Setup
.source { materializer => _ =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Source
.fromFuture(session.map(_.streamedQuery(query))(materializer.system.dispatcher))
.flatMapConcat(identity)
Source
.setup {
case (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Source
.fromFuture(session.map(_.streamedQuery(query))(materializer.system.dispatcher))
.flatMapConcat(identity)
}
.mapMaterializedValue(_ => NotUsed)

Expand Down
Loading

0 comments on commit e925403

Please sign in to comment.