diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala index cac127a8..9577cb23 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -55,21 +55,6 @@ import reactor.core.publisher.Mono .asFuture() } - /** - * Batch update of SQL statements without bind parameters. - */ - def updateBatchInTx(conn: Connection, statements: immutable.IndexedSeq[String])(implicit - ec: ExecutionContext): Future[Int] = { - val batch = conn.createBatch() - statements.foreach(batch.add) - val consumer: BiConsumer[Int, Integer] = (acc, elem) => acc + elem.intValue() - Flux - .from[Result](batch.execute()) - .concatMap(_.getRowsUpdated) - .collect(() => 0, consumer) - .asFuture() - } - def updateInTx(statements: immutable.IndexedSeq[Statement])(implicit ec: ExecutionContext): Future[immutable.IndexedSeq[Int]] = // connection not intended for concurrent calls, make sure statements are executed one at a time diff --git a/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index 4f60faa4..404ff667 100644 --- a/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -186,22 +186,6 @@ private[projection] class R2dbcOffsetStore( (projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed) VALUES (?,?,?,?,?,?, transaction_timestamp())""" - private def insertTimestampOffsetBatchSql(pid: Pid, seqNr: SeqNr, offsetTimestamp: Instant): String = { - def validateStringParam(name: String, value: String): Unit = { - if (value.contains('\'')) - throw new IllegalArgumentException(s"Illegal $name parameter [$value]") - } - validateStringParam("projectionId.name", projectionId.name) - validateStringParam("projectionId.key", projectionId.key) - validateStringParam("pid", pid) - - val slice = persistenceExt.sliceForPersistenceId(pid) - sql""" - INSERT INTO $timestampOffsetTable - (projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed) - VALUES ('${projectionId.name}','${projectionId.key}',$slice,'$pid',$seqNr,'$offsetTimestamp', transaction_timestamp())""" - } - // delete less than a timestamp private val deleteOldTimestampOffsetSql: String = sql"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset < ?" @@ -515,15 +499,19 @@ private[projection] class R2dbcOffsetStore( // FIXME change to trace logger.debug("saving timestamp offset [{}], {}", records.last.timestamp, records) + val statement = conn.createStatement(insertTimestampOffsetSql) + if (records.size == 1) { - val statement = conn.createStatement(insertTimestampOffsetSql) val boundStatement = bindRecord(statement, records.head) R2dbcExecutor.updateOneInTx(boundStatement) } else { - val statements = records.map { rec => - insertTimestampOffsetBatchSql(rec.pid, rec.seqNr, rec.timestamp) - } - R2dbcExecutor.updateBatchInTx(conn, statements) + // TODO Try Batch without bind parameters for better performance. Risk of sql injection for these parameters is low. + val boundStatement = + records.foldLeft(statement) { (stmt, rec) => + stmt.add() + bindRecord(stmt, rec) + } + R2dbcExecutor.updateBatchInTx(boundStatement) } }