Skip to content

Commit

Permalink
spline #1155 implement tx locking mechanism. Add the TxTemplate class…
Browse files Browse the repository at this point in the history
… that implement Tx lock handling algorithm.
  • Loading branch information
wajda committed Sep 26, 2024
1 parent 42dc886 commit 33b3a1f
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 31 deletions.
3 changes: 1 addition & 2 deletions arangodb-foxx-services/src/main/persistence/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ export type TxNum = number
*/
export type WriteTxInfo = {
num: TxNum
uid: TxId // primary key - universally unique // TODO: double check with ArangoDB if it's indeed unique for entire collection life even if records are constantly removed
sid: TxId // secondary key - for deduplication purpose
uid: TxId
params: TxParams
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { store } from './store'
import { aql, db } from '@arangodb'
import { withTimeTracking } from '../utils/common'
import { TxManager } from './txm'
import { TxTemplate } from './txm/tx-template'


export function storeExecutionEvent(progress: Progress): void {
Expand Down Expand Up @@ -70,8 +71,8 @@ export function storeExecutionEvent(progress: Progress): void {
const progressEdge: Partial<ArangoDB.Edge> =
edge(CollectionName.Progress, progress._key, CollectionName.ExecutionPlan, progress.planKey, progress._key)

const wtxInfo: WriteTxInfo = TxManager.startWrite(
progress._key,
const txTemplate = new TxTemplate(
`${CollectionName.Progress}/${progress._key}`,
{
execEventInfo: {
_key: progress._key,
Expand All @@ -81,14 +82,9 @@ export function storeExecutionEvent(progress: Progress): void {
}
})

try {
txTemplate.doWrite((wtxInfo: WriteTxInfo) => {
store.insertOne(progressWithPlanDetails, CollectionName.Progress, wtxInfo)
store.insertOne(progressEdge, CollectionName.ProgressOf, wtxInfo)
TxManager.commit(wtxInfo)
}
catch (e) {
TxManager.rollback(wtxInfo)
throw e
}
})
})
}
17 changes: 5 additions & 12 deletions arangodb-foxx-services/src/main/services/execution-plan-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ import { ExecutionPlanPersistentModel } from '../../external/api.model'
import { CollectionName, WriteTxInfo } from '../persistence/model'
import { store } from './store'
import { withTimeTracking } from '../utils/common'
import { TxManager } from './txm'
import { TxTemplate } from './txm/tx-template'


export function storeExecutionPlan(eppm: ExecutionPlanPersistentModel): void {
const execPlanKey = eppm.executionPlan._key
withTimeTracking(`STORE PLAN ${execPlanKey}`, () => {
const txInfo: WriteTxInfo = TxManager.startWrite(
execPlanKey,
const txTemplate = new TxTemplate(
`${CollectionName.ExecutionPlan}/${execPlanKey}`,
{
execPlanInfo: {
_key: execPlanKey,
}
})

try {
txTemplate.doWrite((txInfo: WriteTxInfo) => {
// execution plan
store.insertOne(eppm.executes, CollectionName.Executes, txInfo)
store.insertMany(eppm.depends, CollectionName.Depends, txInfo)
Expand Down Expand Up @@ -61,14 +61,7 @@ export function storeExecutionPlan(eppm: ExecutionPlanPersistentModel): void {
// expression
store.insertMany(eppm.expressions, CollectionName.Expression, txInfo)
store.insertMany(eppm.takes, CollectionName.Takes, txInfo)

TxManager.commit(txInfo)

}
catch (e) {
TxManager.rollback(txInfo)
throw e
}
})
})
}

31 changes: 24 additions & 7 deletions arangodb-foxx-services/src/main/services/txm/tx-manager-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

import {TxManager} from './tx-manager'
import { TxManager } from './tx-manager'
import {
AuxCollectionName,
CollectionName,
Expand All @@ -27,8 +27,8 @@ import {
TxParams,
WriteTxInfo
} from '../../persistence/model'
import {store} from '../store'
import {aql, db} from '@arangodb'
import { store } from '../store'
import { aql, db } from '@arangodb'
import * as Logger from '../../utils/logger'
import UpdateResult = ArangoDB.UpdateResult

Expand All @@ -38,6 +38,20 @@ import UpdateResult = ArangoDB.UpdateResult
*/
const MAX_GET_TX_NUM_ATTEMPTS = 50

/**
* Timestamp in milliseconds since Unix epoch
*/
type Timestamp = number

/**
* Transaction record.
*/
type TxRecord = {
sid: string, // secondary transaction ID - represents a logical transaction
started: Timestamp,
obsolete?: Timestamp,
}

export class TxManagerImpl implements TxManager {

private nextTxNumber(): TxNum {
Expand Down Expand Up @@ -80,8 +94,12 @@ export class TxManagerImpl implements TxManager {
// (opposite to one for the READ transaction) as follows:

// First,
// register a new Tx ID in the transaction registry
const txId: TxId = store.insertOne({}, AuxCollectionName.TxInfo)._key
// register a new Tx record in the transaction registry and get its ID
const txRec: TxRecord = {
sid,
started: Date.now()
}
const txId: TxId = store.insertOne(txRec, AuxCollectionName.TxInfo)._key

// Second,
// obtain a next global number to fix a transaction position on a serializable time axis.
Expand All @@ -91,10 +109,9 @@ export class TxManagerImpl implements TxManager {
const wtxInfo: WriteTxInfo = {
num: txNum,
uid: txId,
sid: sid,
params: txParams,
}
Logger.debug('[TX] WRITE STARTED', wtxInfo)
Logger.debug('[TX] WRITE STARTED', wtxInfo, txRec)
return wtxInfo
}

Expand Down
59 changes: 59 additions & 0 deletions arangodb-foxx-services/src/main/services/txm/tx-template.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import {TxParams, WriteTxInfo} from '../../persistence/model'
import {TxManager} from './index'
import {aql, db} from '@arangodb'


export class TxTemplate {
constructor(
private sid: string,
private txParams: TxParams = null) {
}

public doWrite<T>(callback: (wtxInfo: WriteTxInfo) => T): T {
let wtxInfo: WriteTxInfo
try {
wtxInfo = TxManager.startWrite(
this.sid,
this.txParams
)
}
catch (e) {
// TODO: Mark the conflicting transaction as obsolete
// db._query(aql`
// // WITH txInfo
// // FIND the transaction record with SID
// // AND obsolete==null
// // AND started < DATE_NOW() - 1h
// // UPDATE tx.obsolete = DATE_NOW()
// // RETURN ...
// `).next()
throw e
}

try {
const retVal: T = callback(wtxInfo)
TxManager.commit(wtxInfo)
return retVal
}
catch (e) {
TxManager.rollback(wtxInfo)
throw e
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ object CollectionDef {
override def name: String = "txInfo"

override def indexDefs: Seq[IndexDef] = Seq(
IndexDef(Seq("sid"), (new PersistentIndexOptions).unique(true))
IndexDef(Seq("sid", "terminated"), (new PersistentIndexOptions).unique(true))
)
}

Expand Down

0 comments on commit 33b3a1f

Please sign in to comment.