From 2ea412112debf7bb6a1ef75a9ac68cfe1bbe0fec Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 1 Jul 2022 16:09:31 +0200 Subject: [PATCH] Save offsets in Batch without bind parameters, #253 (#255) * for better performance since the add-bind approach has round-trips to the db * basic validation of string parameters --- .../r2dbc/internal/R2dbcExecutor.scala | 15 ++++++++++ .../r2dbc/internal/R2dbcOffsetStore.scala | 30 +++++++++++++------ 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala index 9577cb23..cac127a8 100644 --- a/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala +++ b/core/src/main/scala/akka/persistence/r2dbc/internal/R2dbcExecutor.scala @@ -55,6 +55,21 @@ import reactor.core.publisher.Mono .asFuture() } + /** + * Batch update of SQL statements without bind parameters. + */ + def updateBatchInTx(conn: Connection, statements: immutable.IndexedSeq[String])(implicit + ec: ExecutionContext): Future[Int] = { + val batch = conn.createBatch() + statements.foreach(batch.add) + val consumer: BiConsumer[Int, Integer] = (acc, elem) => acc + elem.intValue() + Flux + .from[Result](batch.execute()) + .concatMap(_.getRowsUpdated) + .collect(() => 0, consumer) + .asFuture() + } + def updateInTx(statements: immutable.IndexedSeq[Statement])(implicit ec: ExecutionContext): Future[immutable.IndexedSeq[Int]] = // connection not intended for concurrent calls, make sure statements are executed one at a time diff --git a/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala b/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala index 404ff667..4f60faa4 100644 --- a/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala +++ b/projection/src/main/scala/akka/projection/r2dbc/internal/R2dbcOffsetStore.scala @@ -186,6 +186,22 @@ private[projection] class R2dbcOffsetStore( (projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed) VALUES (?,?,?,?,?,?, transaction_timestamp())""" + private def insertTimestampOffsetBatchSql(pid: Pid, seqNr: SeqNr, offsetTimestamp: Instant): String = { + def validateStringParam(name: String, value: String): Unit = { + if (value.contains('\'')) + throw new IllegalArgumentException(s"Illegal $name parameter [$value]") + } + validateStringParam("projectionId.name", projectionId.name) + validateStringParam("projectionId.key", projectionId.key) + validateStringParam("pid", pid) + + val slice = persistenceExt.sliceForPersistenceId(pid) + sql""" + INSERT INTO $timestampOffsetTable + (projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed) + VALUES ('${projectionId.name}','${projectionId.key}',$slice,'$pid',$seqNr,'$offsetTimestamp', transaction_timestamp())""" + } + // delete less than a timestamp private val deleteOldTimestampOffsetSql: String = sql"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset < ?" @@ -499,19 +515,15 @@ private[projection] class R2dbcOffsetStore( // FIXME change to trace logger.debug("saving timestamp offset [{}], {}", records.last.timestamp, records) - val statement = conn.createStatement(insertTimestampOffsetSql) - if (records.size == 1) { + val statement = conn.createStatement(insertTimestampOffsetSql) val boundStatement = bindRecord(statement, records.head) R2dbcExecutor.updateOneInTx(boundStatement) } else { - // TODO Try Batch without bind parameters for better performance. Risk of sql injection for these parameters is low. - val boundStatement = - records.foldLeft(statement) { (stmt, rec) => - stmt.add() - bindRecord(stmt, rec) - } - R2dbcExecutor.updateBatchInTx(boundStatement) + val statements = records.map { rec => + insertTimestampOffsetBatchSql(rec.pid, rec.seqNr, rec.timestamp) + } + R2dbcExecutor.updateBatchInTx(conn, statements) } }