Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-based counter #141

Open
wants to merge 5 commits into
base: crdt-wip
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,22 @@ sealed class DataType<out T : Any> {
private val values: Array<DataType<*>>
get() = arrayOf(QBoolean, QByte, QInt, QLong, QString, QBytes, QGid, QRef)

fun ofCode(code: Byte): DataType<*>? =
if (code <= 19) {
values.firstOrNull { it.code == code }
} else {
values.map { it.list() }.firstOrNull { it.code == code }
}

fun <T : Any> ofValue(value: T?): DataType<T>? = when (value) {
fun ofCode(code: Byte): DataType<*>? = when(code) {
in 0..31 -> values.firstOrNull { it.code == code }
in 32..63 -> values.map { it.list() }.firstOrNull { it.code == code }
in 63..95 -> ofCode((code - 64).toByte())?.counter()
else -> null
}

fun <T : Any> ofValue(value: T?): DataType<T> = when (value) {
is Boolean -> QBoolean as DataType<T>
is Byte -> QByte as DataType<T>
is Int -> QInt as DataType<T>
is Long -> QLong as DataType<T>
is String -> QString as DataType<T>
is ByteArray -> QBytes as DataType<T>
is Gid -> QGid as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it).list() } as DataType<T>
else -> QRef as DataType<T>
}
}
Expand All @@ -57,7 +57,14 @@ sealed class DataType<out T : Any> {
return QList(this)
}

fun isList(): Boolean = (code.toInt().and(32)) > 0
fun isList(): Boolean = code in 32..63

fun counter(): QCounter<T> {
require(this is QByte || this is QInt || this is QLong) { "Only primitive number values are allowed in counters" }
return QCounter(this)
}

fun isCounter(): Boolean = code in 64..95

fun ref(): Boolean = this == QRef || this is QList<*> && this.itemsType == QRef

Expand All @@ -73,6 +80,7 @@ sealed class DataType<out T : Any> {
is QBytes -> ByteArray::class
is QGid -> Gid::class
is QList<*> -> this.itemsType.typeClass()
is QCounter<*> -> this.primitiveType.typeClass()
QRef -> Any::class
}
}
Expand All @@ -85,6 +93,12 @@ data class QList<out I : Any>(val itemsType: DataType<I>) : DataType<List<I>>()

}

data class QCounter<out I : Any>(val primitiveType: DataType<I>) : DataType<I>() {

override val code = (64 + primitiveType.code).toByte()

}

object QBoolean : DataType<Boolean>() {

override val code = 0.toByte()
Expand Down Expand Up @@ -134,4 +148,4 @@ object QGid : DataType<Gid>() {
}

fun isListOfVals(list: List<Any>?) =
list == null || list.isEmpty() || list.firstOrNull()?.let { DataType.ofValue(it)?.value() } ?: true
list == null || list.isEmpty() || list.firstOrNull()?.let { DataType.ofValue(it).value() } ?: true
5 changes: 3 additions & 2 deletions qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import qbit.api.gid.Gid
import qbit.api.model.*
import qbit.api.model.impl.AttachedEntity
import qbit.collections.LimitedPersistentMap
import qbit.trx.deoperationalize
import qbit.typing.typify
import kotlin.reflect.KClass

Expand All @@ -30,8 +31,8 @@ class IndexDb(

private val dataClassesCache = atomic<LimitedPersistentMap<Entity, Any>>(LimitedPersistentMap(1024))

override fun with(facts: Iterable<Eav>): InternalDb {
return IndexDb(index.addFacts(facts), serialModule)
override fun with(facts: Iterable<Eav>): IndexDb {
return IndexDb(index.addFacts(deoperationalize(this, facts.toList())), serialModule)
}

override fun pullEntity(gid: Gid): StoredEntity? {
Expand Down
2 changes: 1 addition & 1 deletion qbit-core/src/commonMain/kotlin/qbit/index/Indexer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Indexer(
.toList()
.map { it.entities() }
.fold(base ?: IndexDb(Index(), serialModule)) { db, n ->
IndexDb(db.index.add(n), serialModule)
db.with(n.flatMap { it.second })
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kotlinx.coroutines.flow.toList
import qbit.api.Instances
import qbit.api.gid.Gid
import qbit.api.model.Attr
import qbit.api.model.DataType
import qbit.api.model.Eav
import qbit.api.model.Hash
import qbit.index.RawEntity
Expand Down Expand Up @@ -41,7 +42,9 @@ data class LogsDiff(
resolve(writesFromA[it]!!, writesFromB[it]!!)
}
}
return resolvingEavsByGid.values.map { RawEntity(it.first().gid, it) }
return resolvingEavsByGid
.filter { !it.value.isEmpty() }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Во-первых тут идея ругается, что надо it.value.isNotEmpty()

Во-вторых чёт мне эта фильтрация в целом не очень нравится, давай хотя бы коммент напишем откуда там пустые значения

В-третьих, чёт меня малёха смущает, что это приведёт к утере атрибута сущности, а тут вроде везде речь о сущностях целиком

.values.map { RawEntity(it.first().gid, it) }
}

fun logAEntities(): List<RawEntity> {
Expand All @@ -52,6 +55,15 @@ data class LogsDiff(
}
}

// This snippet is probably useless and should be wiped out
fun logBOperations(resolveAttrName: (String) -> Attr<Any>?): List<RawEntity> {
return writesFromB.entries
.filter { DataType.ofCode(resolveAttrName(it.key.attr)!!.type)!!.isCounter() }
.flatMap { operationFromB ->
operationFromB.value.map { RawEntity(operationFromB.key.gid, listOf<Eav>(it.eav)) }
}
}

private fun List<PersistedEav>.lastByTimestamp() =
maxByOrNull { it.timestamp }

Expand All @@ -68,6 +80,7 @@ internal fun lastWriterWinsResolve(resolveAttrName: (String) -> Attr<Any>?): (Li
// temporary dirty hack until crdt counter or custom resolution strategy support is implemented
attr == Instances.nextEid -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.eav.value as Int }!!.eav)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это надо выкосить?

attr.list -> (eavsFromA + eavsFromB).map { it.eav }.distinct()
DataType.ofCode(attr.type)!!.isCounter() -> ArrayList()
else -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.timestamp }!!.eav)
}
}
Expand Down
15 changes: 12 additions & 3 deletions qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SchemaBuilder(private val serialModule: SerializersModule) {
?: throw QBitException("Cannot find descriptor for $type")
val eb = EntityBuilder<T>(descr)
eb.body()
attrs.addAll(schemaFor(descr, eb.uniqueProps))
attrs.addAll(schemaFor(descr, eb.uniqueProps, eb.counters))
}

}
Expand All @@ -33,6 +33,8 @@ class EntityBuilder<T : Any>(private val descr: SerialDescriptor) {

internal val uniqueProps = HashSet<String>()

internal val counters = HashSet<String>()

fun uniqueInt(prop: KProperty1<T, Int>) {
uniqueAttr(prop)
}
Expand All @@ -48,15 +50,22 @@ class EntityBuilder<T : Any>(private val descr: SerialDescriptor) {
uniqueProps.add(AttrName(descr, idx).asString())
}

fun counter(prop: KProperty1<T, *>) {
val (idx, _) = descr.elementNames
.withIndex().firstOrNull { (_, name) -> name == prop.name }
?: throw QBitException("Cannot find attr for ${prop.name} in $descr")
counters.add(AttrName(descr, idx).asString())
}

}

fun schemaFor(rootDesc: SerialDescriptor, unique: Set<String> = emptySet()): List<Attr<Any>> {
fun schemaFor(rootDesc: SerialDescriptor, unique: Set<String> = emptySet(), counters: Set<String> = emptySet()): List<Attr<Any>> {
return rootDesc.elementDescriptors
.withIndex()
.filter { rootDesc.getElementName(it.index) !in setOf("id", "gid") }
.map { (idx, desc) ->
val dataType = DataType.of(desc)
val attr = AttrName(rootDesc, idx).asString()
val dataType = if (attr in counters) DataType.of(desc).counter() else DataType.of(desc)
Attr<Any>(null, attr, dataType.code, attr in unique, dataType.isList())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ internal fun deserialize(ins: Input): Any {
private fun <T : Any> readMark(ins: Input, expectedMark: DataType<T>): Any {
return when (expectedMark) {
QBoolean -> (ins.readByte() == 1.toByte()) as T
QByte, QInt, QLong -> readLong(ins) as T
QByte, QInt, QLong, is QCounter<*> -> readLong(ins) as T

QBytes -> readLong(ins).let { count ->
readBytes(ins, count.toInt()) as T
Expand Down
49 changes: 49 additions & 0 deletions qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package qbit.trx

import qbit.api.QBitException
import qbit.api.model.DataType
import qbit.api.model.Eav
import qbit.index.InternalDb

fun operationalize(db: InternalDb, facts: List<Eav>): List<Eav> {
return facts.map { operationalizeCounter(db, it) }
}

private fun operationalizeCounter(db: InternalDb, fact: Eav): Eav {
val attr = db.attr(fact.attr)!!
val dataType = DataType.ofCode(attr.type)!!
return if (dataType.isCounter()) {
val previous = db.pullEntity(fact.gid)?.tryGet(attr)
if (previous != null) {
Eav(
fact.gid,
fact.attr,
if (previous is Byte && fact.value is Byte) fact.value - previous
else if (previous is Int && fact.value is Int) fact.value - previous
else if (previous is Long && fact.value is Long) fact.value - previous
else throw QBitException("Unexpected counter value type for $fact")
)
} else fact
} else fact
}

fun deoperationalize(db: InternalDb, facts: List<Eav>): List<Eav> {
return facts.map { deoperationalizeCounter(db, it) }
}

private fun deoperationalizeCounter(db: InternalDb, fact: Eav): Eav {
val attr = db.attr(fact.attr)
return if (attr != null && DataType.ofCode(attr.type)!!.isCounter()) {
val previous = db.pullEntity(fact.gid)?.tryGet(attr)
if (previous != null) {
Eav(
fact.gid,
fact.attr,
if (previous is Byte && fact.value is Byte) previous + fact.value
else if (previous is Int && fact.value is Int) previous + fact.value
else if (previous is Long && fact.value is Long) previous + fact.value
else throw QBitException("Unexpected counter value type for $fact")
)
} else fact
} else fact
}
5 changes: 3 additions & 2 deletions qbit-core/src/commonMain/kotlin/qbit/trx/Trx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ internal class QTrx(
return QbitWriteResult(entityGraphRoot, curDb)
}
validate(curDb, updatedFacts)
factsBuffer.addAll(updatedFacts)
curDb = curDb.with(updatedFacts)
val operationalizedFacts = operationalize(curDb, updatedFacts)
factsBuffer.addAll(operationalizedFacts)
curDb = curDb.with(operationalizedFacts)

val res = if (facts.entityFacts[entityGraphRoot]!!.firstOrNull()?.gid in entities) {
entityGraphRoot
Expand Down
24 changes: 23 additions & 1 deletion qbit-core/src/commonTest/kotlin/qbit/FunTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ class FunTest {
assertEquals(bomb.country, storedBomb.country)
assertEquals(bomb.optCountry, storedBomb.optCountry)
assertEquals(
listOf(Country(12884901889, "Country1", 0), Country(4294967383, "Country3", 2)),
listOf(Country(12884901889, "Country1", 0), Country(4294967384, "Country3", 2)),
storedBomb.countiesList
)
// todo: assertEquals(bomb.countriesListOpt, storedBomb.countriesListOpt)
Expand Down Expand Up @@ -574,4 +574,26 @@ class FunTest {
assertEquals(Gid(nsk.id!!), trx2EntityAttrValues.first { it.attr.name == "City/region" }.value)
}
}

@JsName("Test_counter_resolving")
@Test
fun `Test counter resolving`() {
runBlocking {
val conn = setupTestSchema()
val counter = IntCounterEntity(1, 10)
val trx = conn.trx()
trx.persist(counter)
trx.commit()

val trx1 = conn.trx()
val trx2 = conn.trx()
trx1.persist(counter.copy(counter = 40))
delay(100)
trx2.persist(counter.copy(counter = 70))
trx1.commit()
trx2.commit()

assertEquals(conn.db().pull<IntCounterEntity>(1)?.counter, 100)
}
}
}
42 changes: 42 additions & 0 deletions qbit-core/src/commonTest/kotlin/qbit/OperationalizationTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package qbit

import qbit.api.gid.nextGids
import qbit.factoring.serializatoin.KSFactorizer
import qbit.test.model.IntCounterEntity
import qbit.trx.operationalize
import qbit.typing.qbitCoreTestsSerialModule
import kotlin.js.JsName
import kotlin.test.Test
import kotlin.test.assertEquals

class OperationalizationTest {

private val gids = qbit.api.gid.Gid(0, 0).nextGids()

val factor = KSFactorizer(qbitCoreTestsSerialModule)::factor

val emptyDb = dbOf(gids, *(bootstrapSchema.values + testSchema).toTypedArray())

@JsName("Counter_not_persisted_in_db_should_pass_as_is")
@Test
fun `Counter not persisted in db should pass as-is`() {
val counterEntity = IntCounterEntity(null, 10)
val facts = operationalize(emptyDb, factor(counterEntity, emptyDb::attr, gids).entityFacts.values.first())
assertEquals(1, facts.size, "Factoring of single entity with single attr should produce single fact")
assertEquals("IntCounterEntity/counter", facts[0].attr)
assertEquals(10, facts[0].value)
}

@JsName("Persisted_counter_should_turn_into_difference")
@Test
fun `Persisted counter should turn into difference`() {
val counterEntity = IntCounterEntity(1, 10)
val updatedDb = emptyDb.with(factor(counterEntity, emptyDb::attr, gids))

counterEntity.counter = 100
val facts = operationalize(updatedDb, factor(counterEntity, updatedDb::attr, gids).entityFacts.values.first())
assertEquals(1, facts.size, "Factoring of single entity with single attr should produce single fact")
assertEquals("IntCounterEntity/counter", facts[0].attr)
assertEquals(90, facts[0].value)
}
}
3 changes: 3 additions & 0 deletions qbit-core/src/commonTest/kotlin/qbit/TestSchema.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ val testSchema = schema(internalTestsSerialModule) {
entity(NullableList::class)
entity(NullableRef::class)
entity(IntEntity::class)
entity(IntCounterEntity::class) {
counter(IntCounterEntity::counter)
}
entity(ResearchGroup::class)
entity(EntityWithByteArray::class)
entity(EntityWithListOfBytes::class)
Expand Down
22 changes: 19 additions & 3 deletions qbit-core/src/commonTest/kotlin/qbit/TrxTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,16 @@ import qbit.api.Attrs
import qbit.api.Instances
import qbit.api.QBitException
import qbit.api.db.Conn
import qbit.api.db.attrIs
import qbit.api.db.pull
import qbit.api.db.query
import qbit.api.gid.Gid
import qbit.api.gid.nextGids
import qbit.api.model.Attr
import qbit.api.system.Instance
import qbit.ns.Key
import qbit.ns.ns
import qbit.platform.runBlocking
import qbit.spi.Storage
import qbit.storage.MemStorage
import qbit.test.model.IntCounterEntity
import qbit.test.model.Region
import qbit.test.model.Scientist
import qbit.test.model.testsSerialModule
Expand Down Expand Up @@ -176,6 +174,24 @@ class TrxTest {
}
}

@JsName("Counter_test")
@Test
fun `Counter test`() {
runBlocking {
val conn = setupTestData()
val counterEntity = IntCounterEntity(1, 10)
val trx1 = conn.trx()
trx1.persist(counterEntity)
trx1.commit()
assertEquals(conn.db().pull<IntCounterEntity>(1)?.counter, counterEntity.counter)
counterEntity.counter = 90
val trx2 = conn.trx()
trx2.persist(counterEntity)
trx2.commit()
assertEquals(conn.db().pull<IntCounterEntity>(1)?.counter, counterEntity.counter)
}
}

private suspend fun openEmptyConn(): Pair<Conn, Storage> {
val storage = MemStorage()
val conn = qbit(storage, testsSerialModule)
Expand Down
Loading