Skip to content

Commit

Permalink
Do not create new write actions for RocksDB if values exist in both t…
Browse files Browse the repository at this point in the history
…he new as old version.
  • Loading branch information
jurmous committed Sep 28, 2024
1 parent 306f875 commit 370395d
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 74 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package maryk.datastore.rocksdb.processors.helpers

import maryk.core.properties.types.Key
import maryk.datastore.rocksdb.TableColumnFamilies
import maryk.datastore.rocksdb.Transaction
import maryk.lib.extensions.compare.compareDefinedTo
import org.rocksdb.ReadOptions

internal fun getCurrentValues(
transaction: Transaction,
columnFamilies: TableColumnFamilies,
key: Key<*>,
referenceAsBytes: ByteArray
): List<Pair<ByteArray, ByteArray>> {
val prefix = key.bytes + referenceAsBytes
val currentValues = mutableListOf<Pair<ByteArray, ByteArray>>()

val iterator = transaction.getIterator(ReadOptions(), columnFamilies.table)
iterator.seek(prefix)

while (iterator.isValid() && prefix.compareDefinedTo(iterator.key()) == 0) {
currentValues.add(iterator.key() to iterator.value())
iterator.next()
}

return currentValues
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ internal fun setTypedValue(
columnFamilies: TableColumnFamilies,
key: Key<*>,
reference: ByteArray,
versionBytes: ByteArray
versionBytes: ByteArray,
qualifiersToKeep: MutableList<ByteArray>? = null,
shouldWrite: ((referenceBytes: ByteArray, valueBytes: ByteArray) -> Boolean)? = null,
) {
val properValue = if (value is MultiTypeEnum<*>) {
TypedValue(value, Unit)
Expand All @@ -31,11 +33,14 @@ internal fun setTypedValue(
val typeDefinition = TypeValue.castDefinition(definition)

var index = 0
qualifiersToKeep?.add(reference)
if (typedValue.value == Unit) {
val valueBytes = ByteArray(typedValue.type.index.calculateVarIntWithExtraInfoByteSize())
typedValue.type.index.writeVarIntWithExtraInfo(TypeIndicator.ComplexTypeIndicator.byte) { valueBytes[index++] = it }

setValue(transaction, columnFamilies, key, reference, versionBytes, valueBytes)
if (shouldWrite?.invoke(reference, valueBytes) != false) {
setValue(transaction, columnFamilies, key, reference, versionBytes, valueBytes)
}
} else {
val typeValueDefinition = typeDefinition.definition(typedValue.type) as IsSimpleValueDefinition<Any, *>
val valueBytes = ByteArray(
Expand All @@ -47,13 +52,15 @@ internal fun setTypedValue(
typedValue.type.index.writeVarIntWithExtraInfo(TypeIndicator.SimpleTypeIndicator.byte, writer)
typeValueDefinition.writeStorageBytes(typedValue.value, writer)

setValue(
transaction,
columnFamilies,
key,
reference,
versionBytes,
valueBytes
)
if (shouldWrite?.invoke(reference, valueBytes) != false) {
setValue(
transaction,
columnFamilies,
key,
reference,
versionBytes,
valueBytes
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package maryk.datastore.rocksdb.processors.helpers

import maryk.core.properties.types.Key
import maryk.datastore.rocksdb.TableColumnFamilies
import maryk.datastore.rocksdb.Transaction
import maryk.datastore.shared.TypeIndicator
import maryk.lib.extensions.compare.compareTo
import maryk.lib.extensions.compare.compareToWithOffsetLength

internal fun unsetNonChangedValues(
transaction: Transaction,
columnFamilies: TableColumnFamilies,
key: Key<*>,
currentValues: List<Pair<ByteArray, ByteArray>>,
qualifiersToKeep: List<ByteArray>,
versionBytes: ByteArray
) {
val sortedQualifiersToKeep = qualifiersToKeep.sortedWith { o1, o2 -> o1.compareTo(o2) }
var minIndex = 0

for ((qualifier, _) in currentValues) {
val index = sortedQualifiersToKeep.binarySearch(fromIndex = minIndex) {
it.compareToWithOffsetLength(qualifier, key.bytes.size, qualifier.size - key.bytes.size)
}
if (index < 0) {
// Delete the value by setting it to the DeletedIndicator
setValue(transaction, columnFamilies, qualifier, versionBytes, TypeIndicator.DeletedIndicator.byteArray)
} else {
// Start next time comparing with next value in qualifiersToKeep as they are ordered
minIndex = index + 1
}
}
}
Loading

0 comments on commit 370395d

Please sign in to comment.