Skip to content

Commit

Permalink
Merge pull request #111 from d-r-q/q103/mpp/concurrent-hash-map
Browse files Browse the repository at this point in the history
Q103/mpp/concurrent hash map
  • Loading branch information
Aleksey authored May 30, 2020
2 parents cd134b0 + f918ca6 commit 1fd7eb0
Show file tree
Hide file tree
Showing 53 changed files with 917 additions and 672 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ subprojects {
maven { url = URI("https://kotlin.bintray.com/kotlinx") }
}

if (project.name in setOf("qbit-core")) {
if (project.name in setOf("qbit-core", "qbit-storages-tests")) {
println("Enabling nodejs tests for ${project.name}")

configure<NodeExtension> {
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ kotlin.code.style=official
kotlin_version=1.4-M1
systemProp.kotlin_version=1.4-M1
kotlin_serialization_version=0.20.0-1.4-M1
kotlin_coroutines_version=1.3.5-1.4-M1
ktor_version=1.3.2-1.4-M1
10 changes: 6 additions & 4 deletions qbit-core-jvm/src/commonMain/kotlin/qbit/BootstrapJvm.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ import qbit.serialization.NodeData
import qbit.serialization.NodesStorage
import qbit.serialization.Root
import qbit.spi.Storage
import qbit.storage.SerializedStorage

internal fun bootstrap(storage: Storage, dbUuid: DbUuid, factor: Factor, serialModule: SerialModule): Conn {
internal suspend fun bootstrap(storage: Storage, dbUuid: DbUuid, factor: Factor, serialModule: SerialModule): Conn {
val serializedStorage = SerializedStorage(storage)
val trx = listOf(Attrs.name, Attrs.type, Attrs.unique, Attrs.list, Instances.iid, Instances.forks, Instances.nextEid, tombstone)
.flatMap { it.toFacts() }
.plus(factor(protoInstance, bootstrapSchema::get, EmptyIterator))

val root = Root(null, dbUuid, currentTimeMillis(), NodeData(trx.toTypedArray()))
val storedRoot = NodesStorage(storage).store(root)
storage.add(Namespace("refs")["head"], storedRoot.hash.bytes)
return QConn(serialModule, dbUuid, storage, storedRoot, factor)
val storedRoot = NodesStorage(serializedStorage).store(root)
serializedStorage.add(Namespace("refs")["head"], storedRoot.hash.bytes)
return QConn(serialModule, dbUuid, serializedStorage, storedRoot, factor)
}

internal fun Attr<*>.toFacts(): List<Eav> = listOf(Eav(this.id!!, Attrs.name.name, this.name),
Expand Down
6 changes: 3 additions & 3 deletions qbit-core-jvm/src/commonMain/kotlin/qbit/Conn.kt
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class SchemaValidator : SerialModuleCollector {
}

}
fun qbit(storage: Storage, appSerialModule: SerialModule): Conn {
suspend fun qbit(storage: Storage, appSerialModule: SerialModule): Conn {
val iid = Iid(1, 4)
val dbUuid = DbUuid(iid)
val headHash = storage.load(Namespace("refs")["head"])
Expand Down Expand Up @@ -108,15 +108,15 @@ internal class QConn(serialModule: SerialModule, override val dbUuid: DbUuid, va
return QTrx(db.pull(Gid(dbUuid.iid, theInstanceEid))!!, trxLog, db, this, factor)
}

override fun <R : Any> persist(e: R): WriteResult<R?> {
override suspend fun <R : Any> persist(e: R): WriteResult<R?> {
return with(trx()) {
val wr = persist(e)
commit()
wr
}
}

override fun update(trxLog: TrxLog, newLog: TrxLog, newDb: InternalDb) {
override suspend fun update(trxLog: TrxLog, newLog: TrxLog, newDb: InternalDb) {
if (this.trxLog != trxLog) {
throw ConcurrentModificationException("Concurrent transactions isn't supported yet")
}
Expand Down
2 changes: 1 addition & 1 deletion qbit-core-jvm/src/commonMain/kotlin/qbit/api/db/Conn.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ abstract class Conn internal constructor() {

abstract fun trx(): Trx

abstract fun <R : Any> persist(e: R): WriteResult<R?>
abstract suspend fun <R : Any> persist(e: R): WriteResult<R?>

abstract val head: Hash

Expand Down
2 changes: 1 addition & 1 deletion qbit-core-jvm/src/commonMain/kotlin/qbit/api/db/Trx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ abstract class Trx internal constructor() {

abstract fun <R : Any> persist(entityGraphRoot: R): WriteResult<R?>

abstract fun commit()
abstract suspend fun commit()

abstract fun rollback()

Expand Down
14 changes: 0 additions & 14 deletions qbit-core-jvm/src/commonMain/kotlin/qbit/platform/Collections.kt
Original file line number Diff line number Diff line change
@@ -1,19 +1,5 @@
package qbit.platform

expect class ConcurrentHashMap<K, V>() {
fun putIfAbsent(key: K, value: V): V?
fun replace(key: K, value: V): V?
operator fun get(key: K): V?
val keys: KeySetView<K, V>
fun containsKey(key: K): Boolean
}

expect class KeySetView<K, V>

expect fun <K, V> KeySetView<K, V>.asSequence(): Sequence<K>

expect inline fun <K, V> ConcurrentHashMap<K, V>.getOrPut(key: K, defaultValue: () -> V): V

expect class WeakHashMap<K, V>() {
operator fun get(key: K): V?
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
package qbit.serialization

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.withContext
import qbit.api.QBitException
import qbit.api.model.Hash
import qbit.api.model.hash
import qbit.ns.Namespace
import qbit.platform.asInput
import qbit.platform.createSingleThreadCoroutineDispatcher
import qbit.spi.Storage

private val nodes = Namespace("nodes")

class NodesStorage(private val storage: Storage) : (NodeRef) -> NodeVal<Hash>? {
class NodesStorage(private val storage: Storage) :
(NodeRef) -> NodeVal<Hash>?,
CoroutineScope by CoroutineScope(createSingleThreadCoroutineDispatcher("Nodes writer")) {

fun store(n: NodeVal<Hash?>): NodeVal<Hash> {
val data = SimpleSerialization.serializeNode(n)
val hash = hash(data)
if (n.hash != null && n.hash != hash) {
throw AssertionError("NodeVal has hash ${n.hash.toHexString()}, but it's serialization has hash ${hash.toHexString()}")
}
if (!storage.hasKey(hash.key())) {
storage.add(hash.key(), data)
suspend fun store(n: NodeVal<Hash?>): NodeVal<Hash> {
return withContext(this.coroutineContext) {
val data = SimpleSerialization.serializeNode(n)
val hash = hash(data)
if (n.hash != null && n.hash != hash) {
throw AssertionError("NodeVal has hash ${n.hash.toHexString()}, but it's serialization has hash ${hash.toHexString()}")
}
if (!storage.hasKey(hash.key())) {
storage.add(hash.key(), data)
}
toHashedNode(n, hash)
}
return toHashedNode(n, hash)
}

override fun invoke(ref: NodeRef): NodeVal<Hash>? {
Expand All @@ -30,7 +37,7 @@ class NodesStorage(private val storage: Storage) : (NodeRef) -> NodeVal<Hash>? {
if (hash != ref.hash) {
throw QBitException("Corrupted node. Node hash is ${ref.hash}, but data hash is $hash")
}
return toHashedNode(SimpleSerialization.deserializeNode(value.asInput()) , hash)
return toHashedNode(SimpleSerialization.deserializeNode(value.asInput()), hash)
} catch (e: Exception) {
throw QBitException(cause = e)
}
Expand Down
33 changes: 0 additions & 33 deletions qbit-core-jvm/src/commonMain/kotlin/qbit/spi/Storage.kt

This file was deleted.

18 changes: 0 additions & 18 deletions qbit-core-jvm/src/commonMain/kotlin/qbit/storage/Clone.kt

This file was deleted.

41 changes: 0 additions & 41 deletions qbit-core-jvm/src/commonMain/kotlin/qbit/storage/MemStorage.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ import qbit.index.InternalDb

internal interface CommitHandler {

fun update(trxLog: TrxLog, newLog: TrxLog, newDb: InternalDb)
suspend fun update(trxLog: TrxLog, newLog: TrxLog, newDb: InternalDb)

}
2 changes: 1 addition & 1 deletion qbit-core-jvm/src/commonMain/kotlin/qbit/trx/Trx.kt
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ internal class QTrx(
return QbitWriteResult(res, curDb!!)
}

override fun commit() {
override suspend fun commit() {
ensureReady()
if (factsBuffer.isEmpty()) {
return
Expand Down
4 changes: 2 additions & 2 deletions qbit-core-jvm/src/commonMain/kotlin/qbit/trx/TrxLog.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ internal interface TrxLog {

val hash: Hash

fun append(facts: Collection<Eav>): TrxLog
suspend fun append(facts: Collection<Eav>): TrxLog

}

internal class QTrxLog(private val head: NodeVal<Hash>, private val writer: Writer) : TrxLog {

override val hash = head.hash

override fun append(facts: Collection<Eav>): TrxLog {
override suspend fun append(facts: Collection<Eav>): TrxLog {
val newHead = writer.store(head, facts)
return QTrxLog(newHead, writer)
}
Expand Down
2 changes: 1 addition & 1 deletion qbit-core-jvm/src/commonMain/kotlin/qbit/trx/Writer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import qbit.serialization.NodesStorage

class Writer(private val storage: NodesStorage, private val dbUuid: DbUuid) {

fun store(head: Node<Hash>, e: Collection<Eav>): NodeVal<Hash> {
suspend fun store(head: Node<Hash>, e: Collection<Eav>): NodeVal<Hash> {
try {
if (!storage.hasNode(head)) {
throw QBitException("Could not store child for node with hash=${head.hash}, because it's not exists in the storage")
Expand Down
17 changes: 13 additions & 4 deletions qbit-core-jvm/src/commonTest/kotlin/qbit/BootstrapTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package qbit
import kotlinx.serialization.modules.plus
import qbit.api.Attrs
import qbit.api.Instances
import qbit.api.db.Conn
import qbit.api.db.attrIs
import qbit.api.db.query
import qbit.api.gid.Iid
import qbit.api.model.Attr
import qbit.api.system.DbUuid
import qbit.api.system.Instance
import qbit.ns.Namespace
import qbit.platform.runBlocking
import qbit.storage.MemStorage
import qbit.test.model.testsSerialModule
import kotlin.test.Test
Expand All @@ -21,13 +23,20 @@ import kotlin.test.assertTrue
class BootstrapTest {

private val storage = MemStorage()
private val newDb = bootstrap(storage, DbUuid(Iid(1, 4)), testSchemaFactorizer::factor, qbitSerialModule + testsSerialModule)

private val newDb: Conn

init {
newDb = runBlocking { bootstrap(storage, DbUuid(Iid(1, 4)), testSchemaFactorizer::factor, qbitSerialModule + testsSerialModule) }
}

@Test
fun testInit() {
val db = qbit(storage, testsSerialModule)
assertNotNull(db)
assertTrue(storage.keys(Namespace("nodes")).isNotEmpty())
runBlocking {
val db = qbit(storage, testsSerialModule)
assertNotNull(db)
assertTrue(storage.keys(Namespace("nodes")).isNotEmpty())
}
}

@Test
Expand Down
Loading

0 comments on commit 1fd7eb0

Please sign in to comment.