Skip to content

Commit

Permalink
added support for BulkWrite in Mongo
Browse files Browse the repository at this point in the history
  • Loading branch information
uberto committed Sep 5, 2024
1 parent 732a11d commit fc6beca
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.ubertob.kondor.mongo.core

import org.bson.conversions.Bson

sealed class MongoBulkOperation<T : Any> { //!!! find better name but MongoOperation is already taken... MongoWrite ?

data class Insert<T : Any>(val document: T) : MongoBulkOperation<T>()
data class Update<T : Any>(val filter: Bson, val update: Bson) : MongoBulkOperation<T>()
data class Delete<T : Any>(val filter: Bson) : MongoBulkOperation<T>()
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.ubertob.kondor.mongo.core

import com.mongodb.bulk.BulkWriteResult
import com.mongodb.client.MongoCollection
import com.mongodb.client.MongoDatabase
import com.mongodb.client.model.*
Expand Down Expand Up @@ -105,16 +106,29 @@ class MongoDbSession(
?.orThrow()
}

//!!! todo support bulkWrite
// override fun <T : Any, U : Any> MongoTable<T>.bulkWrite(
// collection: List<U>,
// options: BulkWriteOptions,
// operation: (U) -> Bson
// ): BulkWriteResult =
// internalRun { coll ->
// val writes: List<WriteModel<BsonDocument>> = collection.map { operation(it) as WriteModel<BsonDocument> }
// coll.bulkWrite(writes, options)
// }
override fun <T : Any, U : Any> MongoTable<T>.bulkWrite(
collection: List<U>,
options: BulkWriteOptions,
operation: (U) -> MongoBulkOperation<T>
): BulkWriteResult =
bulkWrite(collection.map(operation), options)


override fun <T : Any> MongoTable<T>.bulkWrite(
operations: Iterable<MongoBulkOperation<T>>,
options: BulkWriteOptions
): BulkWriteResult =

internalRun { coll ->
val bulkOperations = operations.map { operation ->
when (operation) {
is MongoBulkOperation.Insert -> InsertOneModel(toBsonDoc(operation.document))
is MongoBulkOperation.Update -> UpdateOneModel(operation.filter, operation.update)
is MongoBulkOperation.Delete -> DeleteOneModel(operation.filter)
}
}
coll.bulkWrite(bulkOperations)
}

override fun <T : Any> MongoTable<T>.findById(id: Any): T? =
internalRun { coll ->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.ubertob.kondor.mongo.core

import com.mongodb.bulk.BulkWriteResult
import com.mongodb.client.MongoCollection
import com.mongodb.client.model.*
import org.bson.BsonDocument
Expand Down Expand Up @@ -40,12 +41,16 @@ interface MongoSession {
options: FindOneAndDeleteOptions = FindOneAndDeleteOptions()
): T?

//!!! todo
// fun <T : Any> MongoTable<T>.bulkWrite(
// collection: List<T>,
// options: BulkWriteOptions,
// operation: (T) -> WriteModel<BsonDocument>
// ): BulkWriteResult
fun <T : Any> MongoTable<T>.bulkWrite(
operations: Iterable<MongoBulkOperation<T>>,
options: BulkWriteOptions = BulkWriteOptions()
): BulkWriteResult

fun <T : Any, U : Any> MongoTable<T>.bulkWrite(
collection: List<U>,
options: BulkWriteOptions = BulkWriteOptions(),
operation: (U) -> MongoBulkOperation<T>
): BulkWriteResult

//Query Methods
fun <T : Any> MongoTable<T>.findById(id: Any): T?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,26 +254,30 @@ class FlatDocTableTest {
expectThat(missing).isEqualTo(null)
}

@Test
fun `bulkUpdate execute multiple writes in a single go`() {

onMongo(cleanUp).expectSuccess()

val hundredDocs = (1..100).map {
createDoc(it)
}

//TODO !!! add bulkWrite support to kmongo

// @Test
// fun `bulkUpdate execute multiple writes in a single go`() {
//
// onMongo(cleanUp + hundredDocWriter).expectSuccess()
//
// onMongo(mongoOperation {
// FlatDocs.bulkWrite()(
// JSimpleFlatDoc.index `in` setOf(43, 73),
// Updates.set("name", "updated doc")
// )
// }).expectSuccess()
//
// val newDoc43 = onMongo(reader(43)).expectSuccess()
// expectThat(newDoc43?.name).isEqualTo("updated doc")
// val newDoc73 = onMongo(reader(73)).expectSuccess()
// expectThat(newDoc73?.name).isEqualTo("updated doc")
// }
val tot = onMongo(
mongoOperation {
FlatDocs.bulkWrite(hundredDocs) { doc ->
MongoBulkOperation.Insert(doc)
}.insertedCount
}).expectSuccess()

expectThat(tot).isEqualTo(100)

val doc43 = onMongo(reader(43)).expectSuccess()
expectThat(doc43?.name).isEqualTo("mydoc 43")
val doc99 = onMongo(reader(99)).expectSuccess()
expectThat(doc99?.name).isEqualTo("mydoc 99")

}

@Test
fun `watch should report the changes`() {
Expand All @@ -292,7 +296,7 @@ class FlatDocTableTest {

expectThat(docTot).isEqualTo(3)

// this is not working, not sure why
// this is not working, not sure why !!!
watcher.asSequence().onEach {
println("seq!!")
println(it)
Expand Down

0 comments on commit fc6beca

Please sign in to comment.