Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-based counter #141

Open
wants to merge 5 commits into
base: crdt-wip
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion qbit-core/src/commonMain/kotlin/qbit/api/QbitSelfSchema.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object Instances {
val nextEid = Attr<Int>(
Gid(1, 5),
"Instance/nextEid",
QInt.code,
QInt.counter().code,
unique = false,
list = false
)
Expand Down
36 changes: 27 additions & 9 deletions qbit-core/src/commonMain/kotlin/qbit/api/model/DataTypes.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import kotlin.reflect.KClass
* - List<Ref>
*/

val scalarRange = 0..31
val listRange = 32..63
val counterRange = 64..95

@Suppress("UNCHECKED_CAST")
sealed class DataType<out T : Any> {

Expand All @@ -31,12 +35,12 @@ sealed class DataType<out T : Any> {
private val values: Array<DataType<*>>
get() = arrayOf(QBoolean, QByte, QInt, QLong, QString, QBytes, QGid, QRef)

fun ofCode(code: Byte): DataType<*>? =
if (code <= 19) {
values.firstOrNull { it.code == code }
} else {
values.map { it.list() }.firstOrNull { it.code == code }
}
fun ofCode(code: Byte): DataType<*>? = when(code) {
in scalarRange -> values.firstOrNull { it.code == code }
in listRange -> ofCode((code - listRange.first).toByte())?.list()
in counterRange -> ofCode((code - counterRange.first).toByte())?.counter()
else -> null
}

fun <T : Any> ofValue(value: T?): DataType<T>? = when (value) {
is Boolean -> QBoolean as DataType<T>
Expand All @@ -46,7 +50,7 @@ sealed class DataType<out T : Any> {
is String -> QString as DataType<T>
is ByteArray -> QBytes as DataType<T>
is Gid -> QGid as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>
is List<*> -> value.firstOrNull()?.let { ofValue(it)?.list() } as DataType<T>?
else -> QRef as DataType<T>
}
}
Expand All @@ -57,7 +61,14 @@ sealed class DataType<out T : Any> {
return QList(this)
}

fun isList(): Boolean = (code.toInt().and(32)) > 0
fun isList(): Boolean = code in listRange

fun counter(): QCounter<T> {
require(this is QByte || this is QInt || this is QLong) { "Only primitive number values are allowed in counters" }
return QCounter(this)
}

fun isCounter(): Boolean = code in counterRange

fun ref(): Boolean = this == QRef || this is QList<*> && this.itemsType == QRef

Expand All @@ -73,6 +84,7 @@ sealed class DataType<out T : Any> {
is QBytes -> ByteArray::class
is QGid -> Gid::class
is QList<*> -> this.itemsType.typeClass()
is QCounter<*> -> this.primitiveType.typeClass()
QRef -> Any::class
}
}
Expand All @@ -81,7 +93,13 @@ sealed class DataType<out T : Any> {

data class QList<out I : Any>(val itemsType: DataType<I>) : DataType<List<I>>() {

override val code = (32 + itemsType.code).toByte()
override val code = (listRange.first + itemsType.code).toByte()

}

data class QCounter<out I : Any>(val primitiveType: DataType<I>) : DataType<I>() {

override val code = (counterRange.first + primitiveType.code).toByte()

}

Expand Down
43 changes: 30 additions & 13 deletions qbit-core/src/commonMain/kotlin/qbit/index/Index.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package qbit.index

import qbit.api.db.QueryPred
import qbit.api.gid.Gid
import qbit.api.model.Attr
import qbit.api.model.DataType
import qbit.api.model.Eav
import qbit.api.tombstone
import qbit.platform.assert
Expand Down Expand Up @@ -61,17 +63,17 @@ class Index(
}
}

fun addFacts(facts: List<Eav>): Index =
addFacts(facts as Iterable<Eav>)
fun addFacts(facts: List<Eav>, resolveAttr: (String) -> Attr<*>? = { null }): Index =
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Может resolveAttr уже в поля засунуть, раз такая пьянка?

addFacts(facts as Iterable<Eav>, resolveAttr)

fun addFacts(facts: Iterable<Eav>): Index {
fun addFacts(facts: Iterable<Eav>, resolveAttr: (String) -> Attr<*>? = { null }): Index {
val entities = facts
.groupBy { it.gid }
.map { it.key to it.value }
return add(entities)
return add(entities, resolveAttr)
}

fun add(entities: List<RawEntity>): Index {
fun add(entities: List<RawEntity>, resolveAttr: (String) -> Attr<*>? = { null }): Index {
val newEntities = HashMap(this.entities)

// eavs of removed or updated entities
Expand All @@ -82,12 +84,27 @@ class Index(
val (gid, eavs) = e

val isUpdate = eavs[0].attr != tombstone.name
val obsoleteEntity =
if (isUpdate) {
newEntities.put(gid, e)
} else {
newEntities.remove(gid)
}
val obsoleteEntity = newEntities.get(gid)

if (isUpdate) {
val crdts = obsoleteEntity?.second
?.filter {
val attr = resolveAttr(it.attr)
if (attr == null) {
false
} else {
DataType.ofCode(attr.type)!!.isCounter()
}
}
?.filter {
crdtEav -> eavs.none { it.attr == crdtEav.attr }
}
?: emptyList()
obsoleteEavs.removeAll(crdts)
newEntities.put(gid, RawEntity(gid, eavs + crdts))
d-r-q marked this conversation as resolved.
Show resolved Hide resolved
} else {
newEntities.remove(gid)
}

if (obsoleteEntity != null) {
obsoleteEavs.addAll(obsoleteEntity.second)
Expand All @@ -103,8 +120,8 @@ class Index(
return Index(newEntities, newAveIndex)
}

fun add(e: RawEntity): Index {
return add(listOf(e))
fun add(e: RawEntity, resolveAttr: (String) -> Attr<*>? = { null }): Index {
return add(listOf(e), resolveAttr)
}

fun entityById(eid: Gid): Map<String, List<Any>>? =
Expand Down
5 changes: 3 additions & 2 deletions qbit-core/src/commonMain/kotlin/qbit/index/IndexDb.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import qbit.api.gid.Gid
import qbit.api.model.*
import qbit.api.model.impl.AttachedEntity
import qbit.collections.LimitedPersistentMap
import qbit.trx.deoperationalize
import qbit.typing.typify
import kotlin.reflect.KClass

Expand All @@ -30,8 +31,8 @@ class IndexDb(

private val dataClassesCache = atomic<LimitedPersistentMap<Entity, Any>>(LimitedPersistentMap(1024))

override fun with(facts: Iterable<Eav>): InternalDb {
return IndexDb(index.addFacts(facts), serialModule)
override fun with(facts: Iterable<Eav>): IndexDb {
return IndexDb(index.addFacts(deoperationalize(this, facts.toList()), this::attr), serialModule)
}

override fun pullEntity(gid: Gid): StoredEntity? {
Expand Down
2 changes: 1 addition & 1 deletion qbit-core/src/commonMain/kotlin/qbit/index/Indexer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class Indexer(
.toList()
.map { it.entities() }
.fold(base ?: IndexDb(Index(), serialModule)) { db, n ->
IndexDb(db.index.add(n), serialModule)
db.with(n.flatMap { it.second })
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import kotlinx.coroutines.flow.toList
import qbit.api.Instances
import qbit.api.gid.Gid
import qbit.api.model.Attr
import qbit.api.model.DataType
import qbit.api.model.Eav
import qbit.api.model.Hash
import qbit.index.RawEntity
Expand Down Expand Up @@ -41,7 +42,9 @@ data class LogsDiff(
resolve(writesFromA[it]!!, writesFromB[it]!!)
}
}
return resolvingEavsByGid.values.map { RawEntity(it.first().gid, it) }
return resolvingEavsByGid
.filter { !it.value.isEmpty() }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Во-первых тут идея ругается, что надо it.value.isNotEmpty()

Во-вторых чёт мне эта фильтрация в целом не очень нравится, давай хотя бы коммент напишем откуда там пустые значения

В-третьих, чёт меня малёха смущает, что это приведёт к утере атрибута сущности, а тут вроде везде речь о сущностях целиком

.values.map { RawEntity(it.first().gid, it) }
}

fun logAEntities(): List<RawEntity> {
Expand All @@ -68,6 +71,7 @@ internal fun lastWriterWinsResolve(resolveAttrName: (String) -> Attr<Any>?): (Li
// temporary dirty hack until crdt counter or custom resolution strategy support is implemented
attr == Instances.nextEid -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.eav.value as Int }!!.eav)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это надо выкосить?

attr.list -> (eavsFromA + eavsFromB).map { it.eav }.distinct()
DataType.ofCode(attr.type)!!.isCounter() -> ArrayList()
else -> listOf((eavsFromA + eavsFromB).maxByOrNull { it.timestamp }!!.eav)
}
}
Expand Down
30 changes: 26 additions & 4 deletions qbit-core/src/commonMain/kotlin/qbit/schema/SchemaDsl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SchemaBuilder(private val serialModule: SerializersModule) {
?: throw QBitException("Cannot find descriptor for $type")
val eb = EntityBuilder<T>(descr)
eb.body()
attrs.addAll(schemaFor(descr, eb.uniqueProps))
attrs.addAll(schemaFor(descr, eb.uniqueProps, eb.counters))
}

}
Expand All @@ -33,6 +33,8 @@ class EntityBuilder<T : Any>(private val descr: SerialDescriptor) {

internal val uniqueProps = HashSet<String>()

internal val counters = HashSet<String>()

fun uniqueInt(prop: KProperty1<T, Int>) {
uniqueAttr(prop)
}
Expand All @@ -42,21 +44,41 @@ class EntityBuilder<T : Any>(private val descr: SerialDescriptor) {
}

private fun uniqueAttr(prop: KProperty1<T, *>) {
uniqueProps.add(getAttrName(prop))
}

fun byteCounter(prop: KProperty1<T, Byte>) {
counter(prop)
}

fun intCounter(prop: KProperty1<T, Int>) {
counter(prop)
}

fun longCounter(prop: KProperty1<T, Long>) {
counter(prop)
}

private fun counter(prop: KProperty1<T, *>) {
counters.add(getAttrName(prop))
}

private fun getAttrName(prop: KProperty1<T, *>): String {
val (idx, _) = descr.elementNames
.withIndex().firstOrNull { (_, name) -> name == prop.name }
?: throw QBitException("Cannot find attr for ${prop.name} in $descr")
uniqueProps.add(AttrName(descr, idx).asString())
return AttrName(descr, idx).asString()
}

}

fun schemaFor(rootDesc: SerialDescriptor, unique: Set<String> = emptySet()): List<Attr<Any>> {
fun schemaFor(rootDesc: SerialDescriptor, unique: Set<String> = emptySet(), counters: Set<String> = emptySet()): List<Attr<Any>> {
return rootDesc.elementDescriptors
.withIndex()
.filter { rootDesc.getElementName(it.index) !in setOf("id", "gid") }
.map { (idx, desc) ->
val dataType = DataType.of(desc)
val attr = AttrName(rootDesc, idx).asString()
val dataType = if (attr in counters) DataType.of(desc).counter() else DataType.of(desc)
Attr<Any>(null, attr, dataType.code, attr in unique, dataType.isList())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ internal fun deserialize(ins: Input): Any {
private fun <T : Any> readMark(ins: Input, expectedMark: DataType<T>): Any {
return when (expectedMark) {
QBoolean -> (ins.readByte() == 1.toByte()) as T
QByte, QInt, QLong -> readLong(ins) as T
QByte, QInt, QLong, is QCounter<*> -> readLong(ins) as T

QBytes -> readLong(ins).let { count ->
readBytes(ins, count.toInt()) as T
Expand Down
49 changes: 49 additions & 0 deletions qbit-core/src/commonMain/kotlin/qbit/trx/Operationalization.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package qbit.trx

import qbit.api.QBitException
import qbit.api.model.DataType
import qbit.api.model.Eav
import qbit.index.InternalDb

fun operationalize(db: InternalDb, facts: List<Eav>): List<Eav> {
return facts.map { operationalizeCounter(db, it) }
}

private fun operationalizeCounter(db: InternalDb, fact: Eav): Eav {
val attr = db.attr(fact.attr)!!
val dataType = DataType.ofCode(attr.type)!!
return if (dataType.isCounter()) {
val previous = db.pullEntity(fact.gid)?.tryGet(attr)
if (previous != null) {
Eav(
fact.gid,
fact.attr,
if (previous is Byte && fact.value is Byte) fact.value - previous
else if (previous is Int && fact.value is Int) fact.value - previous
else if (previous is Long && fact.value is Long) fact.value - previous
else throw QBitException("Unexpected counter value type for $fact")
)
} else fact
} else fact
}

fun deoperationalize(db: InternalDb, facts: List<Eav>): List<Eav> {
return facts.map { deoperationalizeCounter(db, it) }
}

private fun deoperationalizeCounter(db: InternalDb, fact: Eav): Eav {
val attr = db.attr(fact.attr)
return if (attr != null && DataType.ofCode(attr.type)!!.isCounter()) {
val previous = db.pullEntity(fact.gid)?.tryGet(attr)
if (previous != null) {
Eav(
fact.gid,
fact.attr,
if (fact.value is Byte) (previous as Number).toByte() + fact.value
else if (fact.value is Int) (previous as Number).toInt() + fact.value
else if (fact.value is Long) (previous as Number).toLong() + fact.value
else throw QBitException("Unexpected counter value type for $fact")
)
} else fact
} else fact
}
5 changes: 3 additions & 2 deletions qbit-core/src/commonMain/kotlin/qbit/trx/Trx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ internal class QTrx(
return QbitWriteResult(entityGraphRoot, curDb)
}
validate(curDb, updatedFacts)
factsBuffer.addAll(updatedFacts)
curDb = curDb.with(updatedFacts)
val operationalizedFacts = operationalize(curDb, updatedFacts)
factsBuffer.addAll(operationalizedFacts)
curDb = curDb.with(operationalizedFacts)

val res = if (facts.entityFacts[entityGraphRoot]!!.firstOrNull()?.gid in entities) {
entityGraphRoot
Expand Down
Loading