Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the state-based counter prototype #138

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions qbit-core/src/commonMain/kotlin/qbit/Conn.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import qbit.index.Indexer
import qbit.index.InternalDb
import qbit.index.RawEntity
import qbit.ns.Namespace
import qbit.resolving.lastWriterWinsResolve
import qbit.resolving.crdtResolve
import qbit.resolving.logsDiff
import qbit.serialization.*
import qbit.spi.Storage
Expand Down Expand Up @@ -122,14 +122,14 @@ class QConn(
}
}

override suspend fun update(trxLog: TrxLog, newLog: TrxLog, newDb: InternalDb) {
override suspend fun update(trxLog: TrxLog, baseDb: InternalDb, newLog: TrxLog, newDb: InternalDb) {
val (log, db) =
if (hasConcurrentTrx(trxLog)) {
mergeLogs(trxLog, this.trxLog, newLog, newDb)
mergeLogs(trxLog, this.trxLog, newLog, baseDb, newDb)
} else {
newLog to newDb
}
storage.overwrite(Namespace("refs")["head"], newLog.hash.bytes)
storage.overwrite(Namespace("refs")["head"], log.hash.bytes)
this.trxLog = log
this.db = db
}
Expand All @@ -141,6 +141,7 @@ class QConn(
baseLog: TrxLog,
committedLog: TrxLog,
committingLog: TrxLog,
baseDb: InternalDb,
newDb: InternalDb
): Pair<TrxLog, InternalDb> {
val logsDifference = logsDiff(baseLog, committedLog, committingLog, resolveNode)
Expand All @@ -149,7 +150,7 @@ class QConn(
.logAEntities()
.toEavsList()
val reconciliationEavs = logsDifference
.reconciliationEntities(lastWriterWinsResolve { db.attr(it) })
.reconciliationEntities(crdtResolve(baseDb::pullEntity, db::attr))
.toEavsList()

val mergedDb = newDb
Expand Down
34 changes: 26 additions & 8 deletions qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import kotlin.reflect.KClass
* - List<Ref>
*/

val scalarRange = 0..31
val listRange = 32..63
val pnCounterRange = 64..95

@Suppress("UNCHECKED_CAST")
sealed class DataType<out T : Any> {

Expand All @@ -31,12 +35,12 @@ sealed class DataType<out T : Any> {
private val values: Array<DataType<*>>
get() = arrayOf(QBoolean, QByte, QInt, QLong, QString, QBytes, QGid, QRef)

fun ofCode(code: Byte): DataType<*>? =
if (code <= 19) {
values.firstOrNull { it.code == code }
} else {
values.map { it.list() }.firstOrNull { it.code == code }
}
fun ofCode(code: Byte): DataType<*>? = when(code) {
in scalarRange -> values.firstOrNull { it.code == code }
in listRange -> values.map { it.list() }.firstOrNull { it.code == code }
in pnCounterRange -> ofCode((code - 64).toByte())?.counter()
else -> null
}

fun <T : Any> ofValue(value: T?): DataType<T>? = when (value) {
is Boolean -> QBoolean as DataType<T>
Expand All @@ -46,7 +50,7 @@ sealed class DataType<out T : Any> {
is String -> QString as DataType<T>
is ByteArray -> QBytes as DataType<T>
is Gid -> QGid as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>?
else -> QRef as DataType<T>
}
}
Expand All @@ -57,7 +61,14 @@ sealed class DataType<out T : Any> {
return QList(this)
}

fun isList(): Boolean = (code.toInt().and(32)) > 0
fun isList(): Boolean = code in listRange

fun counter(): QCounter<T> {
require(this is QByte || this is QInt || this is QLong) { "Only primitive number values are allowed in counters" }
return QCounter(this)
}

fun isCounter(): Boolean = code in pnCounterRange

fun ref(): Boolean = this == QRef || this is QList<*> && this.itemsType == QRef

Expand All @@ -73,6 +84,7 @@ sealed class DataType<out T : Any> {
is QBytes -> ByteArray::class
is QGid -> Gid::class
is QList<*> -> this.itemsType.typeClass()
is QCounter<*> -> this.primitiveType.typeClass()
QRef -> Any::class
}
}
Expand All @@ -85,6 +97,12 @@ data class QList<out I : Any>(val itemsType: DataType<I>) : DataType<List<I>>()

}

data class QCounter<out I : Any>(val primitiveType: DataType<I>) : DataType<I>() {

override val code = (64 + primitiveType.code).toByte()

}

object QBoolean : DataType<Boolean>() {

override val code = 0.toByte()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package qbit.resolving

import kotlinx.coroutines.flow.toList
import qbit.api.Instances
import qbit.api.QBitException
import qbit.api.gid.Gid
import qbit.api.model.Attr
import qbit.api.model.Eav
import qbit.api.model.Hash
import qbit.api.model.*
import qbit.index.RawEntity
import qbit.serialization.*
import qbit.trx.TrxLog
Expand Down Expand Up @@ -72,6 +71,51 @@ internal fun lastWriterWinsResolve(resolveAttrName: (String) -> Attr<Any>?): (Li
}
}

internal fun crdtResolve(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Эммм, это очень странно.
Зачем дублировать логику lastWriterWinsResolve? А тесты не дублировать?

resolveEntity: (Gid) -> StoredEntity?,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolveBaseEntity

И ваще лучше коммент к методу написать. Как писать хоршие комменты можешь почитать в Philosophy of Software Design и вроде том же Clean Code было

resolveAttrName: (String) -> Attr<Any>?
): (List<PersistedEav>, List<PersistedEav>) -> List<Eav> = { eavsFromA, eavsFromB ->
require(eavsFromA.isNotEmpty()) { "eavsFromA should be not empty" }
require(eavsFromB.isNotEmpty()) { "eavsFromB should be not empty" }

val gid = eavsFromA[0].eav.gid
val attr = resolveAttrName(eavsFromA[0].eav.attr)
?: throw IllegalArgumentException("Cannot resolve ${eavsFromA[0].eav.attr}")

when {
// temporary dirty hack until crdt counter or custom resolution strategy support is implemented
attr == Instances.nextEid -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.eav.value as Int }!!.eav)
attr.list -> (eavsFromA + eavsFromB).map { it.eav }.distinct()
DataType.ofCode(attr.type)!!.isCounter() -> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Симетрия и постоянство уровней абстракции - очень важные свойства кода.
Почитай Implementation Patterns (Symmetry) и Clean Code (G34: Functions Should Descend Only One Level of Abstraction)
А эту портянку вынеси в отдельный метод

val latestFromA = eavsFromA.maxByOrNull { it.timestamp }!!.eav.value
val latestFromB = eavsFromB.maxByOrNull { it.timestamp }!!.eav.value
val previous = resolveEntity(gid)?.tryGet(attr)

listOf(
if (previous != null)
Eav(
eavsFromA[0].eav.gid,
eavsFromA[0].eav.attr,
if (previous is Byte && latestFromA is Byte && latestFromB is Byte) latestFromA + latestFromB - previous
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это тоже лучше утащить в

operator fun Number.plus(another: Number) = when (this) {
  is Int -> this + another.toInt()
  // ...
}

else if (previous is Int && latestFromA is Int && latestFromB is Int) latestFromA + latestFromB - previous
else if (previous is Long && latestFromA is Long && latestFromB is Long) latestFromA + latestFromB - previous
else throw QBitException("Unexpected counter value type for eav with gid=$gid, attr=$attr")
)
else
Eav(
eavsFromA[0].eav.gid,
eavsFromA[0].eav.attr,
if (latestFromA is Byte && latestFromB is Byte) latestFromA + latestFromB
else if (latestFromA is Int && latestFromB is Int) latestFromA + latestFromB
else if (latestFromA is Long && latestFromB is Long) latestFromA + latestFromB
else throw QBitException("Unexpected counter value type for eav with gid=$gid, attr=$attr")
)
)
}
else -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.timestamp }!!.eav)
}
}

internal fun findBaseNode(node1: Node<Hash>, node2: Node<Hash>, nodesDepth: Map<Hash, Int>): Node<Hash> {
return when {
node1 == node2 -> node1
Expand Down
30 changes: 26 additions & 4 deletions qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SchemaBuilder(private val serialModule: SerializersModule) {
?: throw QBitException("Cannot find descriptor for $type")
val eb = EntityBuilder<T>(descr)
eb.body()
attrs.addAll(schemaFor(descr, eb.uniqueProps))
attrs.addAll(schemaFor(descr, eb.uniqueProps, eb.counters))
}

}
Expand All @@ -33,6 +33,8 @@ class EntityBuilder<T : Any>(private val descr: SerialDescriptor) {

internal val uniqueProps = HashSet<String>()

internal val counters = HashSet<String>()

fun uniqueInt(prop: KProperty1<T, Int>) {
uniqueAttr(prop)
}
Expand All @@ -42,21 +44,41 @@ class EntityBuilder<T : Any>(private val descr: SerialDescriptor) {
}

private fun uniqueAttr(prop: KProperty1<T, *>) {
uniqueProps.add(getAttrName(prop))
}

fun byteCounter(prop: KProperty1<T, Byte>) {
counter(prop)
}

fun intCounter(prop: KProperty1<T, Int>) {
counter(prop)
}

fun longCounter(prop: KProperty1<T, Long>) {
counter(prop)
}

private fun counter(prop: KProperty1<T, *>) {
counters.add(getAttrName(prop))
}

private fun getAttrName(prop: KProperty1<T, *>): String {
val (idx, _) = descr.elementNames
.withIndex().firstOrNull { (_, name) -> name == prop.name }
?: throw QBitException("Cannot find attr for ${prop.name} in $descr")
uniqueProps.add(AttrName(descr, idx).asString())
return AttrName(descr, idx).asString()
}

}

fun schemaFor(rootDesc: SerialDescriptor, unique: Set<String> = emptySet()): List<Attr<Any>> {
fun schemaFor(rootDesc: SerialDescriptor, unique: Set<String> = emptySet(), counters: Set<String> = emptySet()): List<Attr<Any>> {
return rootDesc.elementDescriptors
.withIndex()
.filter { rootDesc.getElementName(it.index) !in setOf("id", "gid") }
.map { (idx, desc) ->
val dataType = DataType.of(desc)
val attr = AttrName(rootDesc, idx).asString()
val dataType = if (attr in counters) DataType.of(desc).counter() else DataType.of(desc)
Attr<Any>(null, attr, dataType.code, attr in unique, dataType.isList())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ internal fun deserialize(ins: Input): Any {
private fun <T : Any> readMark(ins: Input, expectedMark: DataType<T>): Any {
return when (expectedMark) {
QBoolean -> (ins.readByte() == 1.toByte()) as T
QByte, QInt, QLong -> readLong(ins) as T
QByte, QInt, QLong, is QCounter<*> -> readLong(ins) as T

QBytes -> readLong(ins).let { count ->
readBytes(ins, count.toInt()) as T
Expand Down
2 changes: 1 addition & 1 deletion qbit-core/src/commonMain/kotlin/qbit/trx/CommitHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import qbit.index.InternalDb

internal interface CommitHandler {

suspend fun update(trxLog: TrxLog, newLog: TrxLog, newDb: InternalDb)
suspend fun update(trxLog: TrxLog, baseDb: InternalDb, newLog: TrxLog, newDb: InternalDb)

}
3 changes: 1 addition & 2 deletions qbit-core/src/commonMain/kotlin/qbit/trx/Trx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ internal class QTrx(
val instance = factor(inst.copy(nextEid = gids.next().eid), curDb::attr, EmptyIterator)
val newLog = trxLog.append(factsBuffer + instance)
try {
base = curDb.with(instance)
commitHandler.update(trxLog, newLog, base)
commitHandler.update(trxLog, base, newLog, curDb.with(instance))
factsBuffer.clear()
} catch (e: Throwable) {
// todo clean up
Expand Down
2 changes: 1 addition & 1 deletion qbit-core/src/commonTest/kotlin/qbit/ConnTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class ConnTest {
)

val newLog = FakeTrxLog(storedLeaf.hash)
conn.update(conn.trxLog, newLog, EmptyDb)
conn.update(conn.trxLog, EmptyDb, newLog, EmptyDb)

assertArrayEquals(newLog.hash.bytes, storage.load(Namespace("refs")["head"]))
}
Expand Down
2 changes: 1 addition & 1 deletion qbit-core/src/commonTest/kotlin/qbit/FakeConn.kt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ internal class FakeConn : Conn(), CommitHandler {
override val head: Hash
get() = TODO("not implemented")

override suspend fun update(trxLog: TrxLog, newLog: TrxLog, newDb: InternalDb) {
override suspend fun update(trxLog: TrxLog, baseDb: InternalDb, newLog: TrxLog, newDb: InternalDb) {
updatesCalls++
}

Expand Down
26 changes: 24 additions & 2 deletions qbit-core/src/commonTest/kotlin/qbit/FunTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ class FunTest {
assertEquals(bomb.country, storedBomb.country)
assertEquals(bomb.optCountry, storedBomb.optCountry)
assertEquals(
listOf(Country(12884901889, "Country1", 0), Country(4294967383, "Country3", 2)),
listOf(Country(12884901889, "Country1", 0), Country(4294967384, "Country3", 2)),
storedBomb.countiesList
)
// todo: assertEquals(bomb.countriesListOpt, storedBomb.countriesListOpt)
Expand Down Expand Up @@ -459,9 +459,9 @@ class FunTest {
trx1.persist(eBrewer.copy(name = "Im different change"))
val trx2 = conn.trx()
trx2.persist(eCodd.copy(name = "Im change 2"))
delay(100)
trx2.persist(pChen.copy(name = "Im different change"))
trx1.commit()
delay(1)
trx2.commit()
conn.db {
assertEquals("Im change 2", it.pull<Scientist>(eCodd.id!!)!!.name)
Expand Down Expand Up @@ -540,6 +540,7 @@ class FunTest {
)
)
trx1.commit()
delay(1)
trx2.commit()
conn.db {
assertEquals("Im change 2", it.pull<Scientist>(eCodd.id!!)!!.name)
Expand Down Expand Up @@ -574,4 +575,25 @@ class FunTest {
assertEquals(Gid(nsk.id!!), trx2EntityAttrValues.first { it.attr.name == "City/region" }.value)
}
}

@JsName("qbit_should_accumulate_concurrent_increments_of_counter")
@Test
fun `qbit should accumulate concurrent increments of counter`() {
runBlocking {
val conn = setupTestSchema()
val counter = IntCounterEntity(1, 10)
val trx = conn.trx()
trx.persist(counter)
trx.commit()

val trx1 = conn.trx()
val trx2 = conn.trx()
trx1.persist(counter.copy(counter = 40))
trx2.persist(counter.copy(counter = 70))
trx1.commit()
trx2.commit()

assertEquals(conn.db().pull<IntCounterEntity>(1)?.counter, 100)
}
}
}
3 changes: 3 additions & 0 deletions qbit-core/src/commonTest/kotlin/qbit/TestSchema.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ val testSchema = schema(internalTestsSerialModule) {
entity(NullableList::class)
entity(NullableRef::class)
entity(IntEntity::class)
entity(IntCounterEntity::class) {
intCounter(IntCounterEntity::counter)
}
entity(ResearchGroup::class)
entity(EntityWithByteArray::class)
entity(EntityWithListOfBytes::class)
Expand Down
Loading