Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Save offsets in Batch without bind parameters, #253 (#255)" #259

Merged
merged 1 commit into from
Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,6 @@ import reactor.core.publisher.Mono
.asFuture()
}

/**
* Batch update of SQL statements without bind parameters.
*/
def updateBatchInTx(conn: Connection, statements: immutable.IndexedSeq[String])(implicit
ec: ExecutionContext): Future[Int] = {
val batch = conn.createBatch()
statements.foreach(batch.add)
val consumer: BiConsumer[Int, Integer] = (acc, elem) => acc + elem.intValue()
Flux
.from[Result](batch.execute())
.concatMap(_.getRowsUpdated)
.collect(() => 0, consumer)
.asFuture()
}

def updateInTx(statements: immutable.IndexedSeq[Statement])(implicit
ec: ExecutionContext): Future[immutable.IndexedSeq[Int]] =
// connection not intended for concurrent calls, make sure statements are executed one at a time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,22 +186,6 @@ private[projection] class R2dbcOffsetStore(
(projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed)
VALUES (?,?,?,?,?,?, transaction_timestamp())"""

private def insertTimestampOffsetBatchSql(pid: Pid, seqNr: SeqNr, offsetTimestamp: Instant): String = {
def validateStringParam(name: String, value: String): Unit = {
if (value.contains('\''))
throw new IllegalArgumentException(s"Illegal $name parameter [$value]")
}
validateStringParam("projectionId.name", projectionId.name)
validateStringParam("projectionId.key", projectionId.key)
validateStringParam("pid", pid)

val slice = persistenceExt.sliceForPersistenceId(pid)
sql"""
INSERT INTO $timestampOffsetTable
(projection_name, projection_key, slice, persistence_id, seq_nr, timestamp_offset, timestamp_consumed)
VALUES ('${projectionId.name}','${projectionId.key}',$slice,'$pid',$seqNr,'$offsetTimestamp', transaction_timestamp())"""
}

// delete less than a timestamp
private val deleteOldTimestampOffsetSql: String =
sql"DELETE FROM $timestampOffsetTable WHERE slice BETWEEN ? AND ? AND projection_name = ? AND timestamp_offset < ?"
Expand Down Expand Up @@ -515,15 +499,19 @@ private[projection] class R2dbcOffsetStore(
// FIXME change to trace
logger.debug("saving timestamp offset [{}], {}", records.last.timestamp, records)

val statement = conn.createStatement(insertTimestampOffsetSql)

if (records.size == 1) {
val statement = conn.createStatement(insertTimestampOffsetSql)
val boundStatement = bindRecord(statement, records.head)
R2dbcExecutor.updateOneInTx(boundStatement)
} else {
val statements = records.map { rec =>
insertTimestampOffsetBatchSql(rec.pid, rec.seqNr, rec.timestamp)
}
R2dbcExecutor.updateBatchInTx(conn, statements)
// TODO Try Batch without bind parameters for better performance. Risk of sql injection for these parameters is low.
val boundStatement =
records.foldLeft(statement) { (stmt, rec) =>
stmt.add()
bindRecord(stmt, rec)
}
R2dbcExecutor.updateBatchInTx(boundStatement)
}
}

Expand Down